HBASE-11643 Read and write MOB in HBase (Jingcheng Du)

This commit is contained in:
Jonathan M Hsieh 2014-09-04 05:41:21 -07:00
parent bcfc6d65af
commit 5c14f749b3
29 changed files with 3853 additions and 13 deletions

View File

@ -28,4 +28,8 @@ public final class TagType {
public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
// mob tags
public static final byte MOB_REFERENCE_TAG_TYPE = (byte) 5;
public static final byte MOB_TABLE_NAME_TAG_TYPE = (byte) 6;
}

View File

@ -1454,4 +1454,33 @@ possible configurations would overwhelm and obscure the important.
<name>hbase.http.staticuser.user</name>
<value>dr.stack</value>
</property>
<!-- Mob properties. -->
<property>
<name>hbase.mob.file.cache.size</name>
<value>1000</value>
<description>
Number of opened file handlers to cache.
A larger value will benefit reads by providing more file handlers per mob
file cache and would reduce frequent file opening and closing.
However, if this is set too high, this could lead to a "too many opened file handlers"
The default value is 1000.
</description>
</property>
<property>
<name>hbase.mob.cache.evict.period</name>
<value>3600</value>
<description>
The amount of time in seconds before the mob cache evicts cached mob files.
The default value is 3600 seconds.
</description>
</property>
<property>
<name>hbase.mob.cache.evict.remain.ratio</name>
<value>0.5f</value>
<description>
The ratio (between 0.0 and 1.0) of files that remains cached after an eviction
is triggered when the number of cached mob files exceeds the hbase.mob.file.cache.size.
The default value is 0.5f.
</description>
</property>
</configuration>

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@ -43,6 +45,8 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
@InterfaceAudience.Private
public class DeleteTableHandler extends TableEventHandler {
@ -152,10 +156,36 @@ public class DeleteTableHandler extends TableEventHandler {
tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName()));
}
// Archive the mob data if there is a mob-enabled column
HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies();
boolean hasMob = false;
for (HColumnDescriptor hcd : hcds) {
if (MobUtils.isMobFamily(hcd)) {
hasMob = true;
break;
}
}
Path mobTableDir = null;
if (hasMob) {
// Archive mob data
mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME),
tableName);
Path regionDir =
new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName());
if (fs.exists(regionDir)) {
HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir);
}
}
// 4. Delete table directory from FS (temp directory)
if (!fs.delete(tempTableDir, true)) {
LOG.error("Couldn't delete " + tempTableDir);
}
// Delete the table directory where the mob files are saved
if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) {
if (!fs.delete(mobTableDir, true)) {
LOG.error("Couldn't delete " + mobTableDir);
}
}
LOG.debug("Table '" + tableName + "' archived!");
}

View File

@ -0,0 +1,114 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
/**
* Cached mob file.
*/
@InterfaceAudience.Private
public class CachedMobFile extends MobFile implements Comparable<CachedMobFile> {
private long accessCount;
private AtomicLong referenceCount = new AtomicLong(0);
public CachedMobFile(StoreFile sf) {
super(sf);
}
public static CachedMobFile create(FileSystem fs, Path path, Configuration conf,
CacheConfig cacheConf) throws IOException {
StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
return new CachedMobFile(sf);
}
public void access(long accessCount) {
this.accessCount = accessCount;
}
public int compareTo(CachedMobFile that) {
if (this.accessCount == that.accessCount) return 0;
return this.accessCount < that.accessCount ? 1 : -1;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof CachedMobFile)) {
return false;
}
return compareTo((CachedMobFile) obj) == 0;
}
@Override
public int hashCode() {
return (int)(accessCount ^ (accessCount >>> 32));
}
/**
* Opens the mob file if it's not opened yet and increases the reference.
* It's not thread-safe. Use MobFileCache.openFile() instead.
* The reader of the mob file is just opened when it's not opened no matter how many times
* this open() method is invoked.
* The reference is a counter that how many times this reader is referenced. When the
* reference is 0, this reader is closed.
*/
@Override
public void open() throws IOException {
super.open();
referenceCount.incrementAndGet();
}
/**
* Decreases the reference of the underlying reader for the mob file.
* It's not thread-safe. Use MobFileCache.closeFile() instead.
* This underlying reader isn't closed until the reference is 0.
*/
@Override
public void close() throws IOException {
long refs = referenceCount.decrementAndGet();
if (refs == 0) {
super.close();
}
}
/**
* Gets the reference of the current mob file.
* Internal usage, currently it's for testing.
* @return The reference of the current mob file.
*/
public long getReferenceCount() {
return this.referenceCount.longValue();
}
}

View File

@ -0,0 +1,217 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
/**
* An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
* If the store is not a mob store, the flusher flushes the MemStore the same with
* DefaultStoreFlusher,
* If the store is a mob store, the flusher flushes the MemStore into two places.
* One is the store files of HBase, the other is the mob files.
* <ol>
* <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li>
* <li>If the size of a cell value is larger than a threshold, it'll be flushed
* to a mob file, another cell with the path of this file will be flushed to HBase.</li>
* <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
* HBase directly.</li>
* </ol>
*
*/
@InterfaceAudience.Private
public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class);
private final Object flushLock = new Object();
private long mobCellValueSizeThreshold = 0;
private Path targetPath;
private HMobStore mobStore;
public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException {
super(conf, store);
mobCellValueSizeThreshold = MobUtils.getMobThreshold(store.getFamily());
this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
store.getColumnFamilyName());
if (!this.store.getFileSystem().exists(targetPath)) {
this.store.getFileSystem().mkdirs(targetPath);
}
this.mobStore = (HMobStore) store;
}
/**
* Flushes the snapshot of the MemStore.
* If this store is not a mob store, flush the cells in the snapshot to store files of HBase.
* If the store is a mob one, the flusher flushes the MemStore into two places.
* One is the store files of HBase, the other is the mob files.
* <ol>
* <li>Cells that are not PUT type or have the delete mark will be directly flushed to
* HBase.</li>
* <li>If the size of a cell value is larger than a threshold, it'll be
* flushed to a mob file, another cell with the path of this file will be flushed to HBase.</li>
* <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
* HBase directly.</li>
* </ol>
*/
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status) throws IOException {
ArrayList<Path> result = new ArrayList<Path>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
StoreFile.Writer writer;
try {
// TODO: We can fail in the below block before we complete adding this flush to
// list of store files. Add cleanup of anything put on filesystem if we fail.
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
false, true, true);
writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
// between a normal and a mob store.
performMobFlush(snapshot, cacheFlushId, scanner, writer, status);
} finally {
finalizeWriter(writer, cacheFlushId, status);
}
}
} finally {
scanner.close();
}
LOG.info("Flushed, sequenceid=" + cacheFlushId + ", memsize="
+ snapshot.getSize() + ", hasBloomFilter=" + writer.hasGeneralBloom()
+ ", into tmp file " + writer.getPath());
result.add(writer.getPath());
return result;
}
/**
* Flushes the cells in the mob store.
* <ol>In the mob store, the cells with PUT type might have or have no mob tags.
* <li>If a cell does not have a mob tag, flushing the cell to different files depends
* on the value length. If the length is larger than a threshold, it's flushed to a
* mob file and the mob file is flushed to a store file in HBase. Otherwise, directly
* flush the cell to a store file in HBase.</li>
* <li>If a cell have a mob tag, its value is a mob file name, directly flush it
* to a store file in HBase.</li>
* </ol>
* @param snapshot Memstore snapshot.
* @param cacheFlushId Log cache flush sequence number.
* @param scanner The scanner of memstore snapshot.
* @param writer The store file writer.
* @param status Task that represents the flush operation and may be updated with status.
* @throws IOException
*/
protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException {
StoreFile.Writer mobFileWriter = null;
int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
HConstants.COMPACTION_KV_MAX_DEFAULT);
long mobKVCount = 0;
long time = snapshot.getTimeRangeTracker().getMaximumTimestamp();
mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
// the target path is {tableName}/.mob/{cfName}/mobFiles
// the relative path is mobFiles
byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
try {
Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
.getName());
List<Cell> cells = new ArrayList<Cell>();
boolean hasMore;
do {
hasMore = scanner.next(cells, compactionKVMax);
if (!cells.isEmpty()) {
for (Cell c : cells) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to
// disk.
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
if (kv.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(kv)
|| kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
writer.append(kv);
} else {
// append the original keyValue in the mob file.
mobFileWriter.append(kv);
mobKVCount++;
// append the tags to the KeyValue.
// The key is same, the value is the filename of the mob file
KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
writer.append(reference);
}
}
cells.clear();
}
} while (hasMore);
} finally {
status.setStatus("Flushing mob file " + store + ": appending metadata");
mobFileWriter.appendMetadata(cacheFlushId, false);
status.setStatus("Flushing mob file " + store + ": closing flushed file");
mobFileWriter.close();
}
if (mobKVCount > 0) {
// commit the mob file from temp folder to target folder.
// If the mob file is committed successfully but the store file is not,
// the committed mob file will be handled by the sweep tool as an unused
// file.
mobStore.commitFile(mobFileWriter.getPath(), targetPath);
} else {
try {
// If the mob file is empty, delete it instead of committing.
store.getFileSystem().delete(mobFileWriter.getPath(), true);
} catch (IOException e) {
LOG.error("Fail to delete the temp mob file", e);
}
}
}
}

View File

@ -0,0 +1,56 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
/**
* The cache configuration for the mob.
*/
@InterfaceAudience.Private
public class MobCacheConfig extends CacheConfig {
private static MobFileCache mobFileCache;
public MobCacheConfig(Configuration conf, HColumnDescriptor family) {
super(conf, family);
instantiateMobFileCache(conf);
}
/**
* Instantiates the MobFileCache.
* @param conf The current configuration.
*/
public static synchronized void instantiateMobFileCache(Configuration conf) {
if (mobFileCache == null) {
mobFileCache = new MobFileCache(conf);
}
}
/**
* Gets the MobFileCache.
* @return The MobFileCache.
*/
public MobFileCache getMobFileCache() {
return mobFileCache;
}
}

View File

@ -0,0 +1,61 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.util.Bytes;
/**
* The constants used in mob.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MobConstants {
public static final byte[] IS_MOB = Bytes.toBytes("isMob");
public static final byte[] MOB_THRESHOLD = Bytes.toBytes("mobThreshold");
public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k
public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw";
public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks";
public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size";
public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000;
public static final String MOB_DIR_NAME = "mobdir";
public static final String MOB_REGION_NAME = ".mob";
public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME);
public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period";
public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio";
public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE,
HConstants.EMPTY_BYTE_ARRAY);
public static final float DEFAULT_EVICT_REMAIN_RATIO = 0.5f;
public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
public final static String TEMP_DIR_NAME = ".tmp";
private MobConstants() {
}
}

View File

@ -0,0 +1,140 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
/**
* The mob file.
*/
@InterfaceAudience.Private
public class MobFile {
private StoreFile sf;
// internal use only for sub classes
protected MobFile() {
}
protected MobFile(StoreFile sf) {
this.sf = sf;
}
/**
* Internal use only. This is used by the sweeper.
*
* @return The store file scanner.
* @throws IOException
*/
public StoreFileScanner getScanner() throws IOException {
List<StoreFile> sfs = new ArrayList<StoreFile>();
sfs.add(sf);
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
false, null, sf.getMaxMemstoreTS());
return sfScanners.get(0);
}
/**
* Reads a cell from the mob file.
* @param search The cell need to be searched in the mob file.
* @param cacheMobBlocks Should this scanner cache blocks.
* @return The cell in the mob file.
* @throws IOException
*/
public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
Cell result = null;
StoreFileScanner scanner = null;
List<StoreFile> sfs = new ArrayList<StoreFile>();
sfs.add(sf);
try {
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs,
cacheMobBlocks, true, false, null, sf.getMaxMemstoreTS());
if (!sfScanners.isEmpty()) {
scanner = sfScanners.get(0);
if (scanner.seek(search)) {
result = scanner.peek();
}
}
} finally {
if (scanner != null) {
scanner.close();
}
}
return result;
}
/**
* Gets the file name.
* @return The file name.
*/
public String getFileName() {
return sf.getPath().getName();
}
/**
* Opens the underlying reader.
* It's not thread-safe. Use MobFileCache.openFile() instead.
* @throws IOException
*/
public void open() throws IOException {
if (sf.getReader() == null) {
sf.createReader();
}
}
/**
* Closes the underlying reader, but do no evict blocks belonging to this file.
* It's not thread-safe. Use MobFileCache.closeFile() instead.
* @throws IOException
*/
public void close() throws IOException {
if (sf != null) {
sf.closeReader(false);
sf = null;
}
}
/**
* Creates an instance of the MobFile.
* @param fs The file system.
* @param path The path of the underlying StoreFile.
* @param conf The configuration.
* @param cacheConf The CacheConfig.
* @return An instance of the MobFile.
* @throws IOException
*/
public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf)
throws IOException {
StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
return new MobFile(sf);
}
}

View File

@ -0,0 +1,270 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.IdLock;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The cache for mob files.
* This cache doesn't cache the mob file blocks. It only caches the references of mob files.
* We are doing this to avoid opening and closing mob files all the time. We just keep
* references open.
*/
@InterfaceAudience.Private
public class MobFileCache {
private static final Log LOG = LogFactory.getLog(MobFileCache.class);
/*
* Eviction and statistics thread. Periodically run to print the statistics and
* evict the lru cached mob files when the count of the cached files is larger
* than the threshold.
*/
static class EvictionThread extends Thread {
MobFileCache lru;
public EvictionThread(MobFileCache lru) {
super("MobFileCache.EvictionThread");
setDaemon(true);
this.lru = lru;
}
@Override
public void run() {
lru.evict();
}
}
// a ConcurrentHashMap, accesses to this map are synchronized.
private Map<String, CachedMobFile> map = null;
// caches access count
private final AtomicLong count;
private long lastAccess;
private final AtomicLong miss;
private long lastMiss;
// a lock to sync the evict to guarantee the eviction occurs in sequence.
// the method evictFile is not sync by this lock, the ConcurrentHashMap does the sync there.
private final ReentrantLock evictionLock = new ReentrantLock(true);
//stripes lock on each mob file based on its hash. Sync the openFile/closeFile operations.
private final IdLock keyLock = new IdLock();
private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("MobFileCache #%d").setDaemon(true).build());
private final Configuration conf;
// the count of the cached references to mob files
private final int mobFileMaxCacheSize;
private final boolean isCacheEnabled;
private float evictRemainRatio;
public MobFileCache(Configuration conf) {
this.conf = conf;
this.mobFileMaxCacheSize = conf.getInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY,
MobConstants.DEFAULT_MOB_FILE_CACHE_SIZE);
isCacheEnabled = (mobFileMaxCacheSize > 0);
map = new ConcurrentHashMap<String, CachedMobFile>(mobFileMaxCacheSize);
this.count = new AtomicLong(0);
this.miss = new AtomicLong(0);
this.lastAccess = 0;
this.lastMiss = 0;
if (isCacheEnabled) {
long period = conf.getLong(MobConstants.MOB_CACHE_EVICT_PERIOD,
MobConstants.DEFAULT_MOB_CACHE_EVICT_PERIOD); // in seconds
evictRemainRatio = conf.getFloat(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO,
MobConstants.DEFAULT_EVICT_REMAIN_RATIO);
if (evictRemainRatio < 0.0) {
evictRemainRatio = 0.0f;
LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is less than 0.0, 0.0 is used.");
} else if (evictRemainRatio > 1.0) {
evictRemainRatio = 1.0f;
LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is larger than 1.0, 1.0 is used.");
}
this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period,
TimeUnit.SECONDS);
}
LOG.info("MobFileCache is initialized, and the cache size is " + mobFileMaxCacheSize);
}
/**
* Evicts the lru cached mob files when the count of the cached files is larger
* than the threshold.
*/
public void evict() {
if (isCacheEnabled) {
// Ensure only one eviction at a time
if (!evictionLock.tryLock()) {
return;
}
printStatistics();
List<CachedMobFile> evictedFiles = new ArrayList<CachedMobFile>();
try {
if (map.size() <= mobFileMaxCacheSize) {
return;
}
List<CachedMobFile> files = new ArrayList<CachedMobFile>(map.values());
Collections.sort(files);
int start = (int) (mobFileMaxCacheSize * evictRemainRatio);
if (start >= 0) {
for (int i = start; i < files.size(); i++) {
String name = files.get(i).getFileName();
CachedMobFile evictedFile = map.remove(name);
if (evictedFile != null) {
evictedFiles.add(evictedFile);
}
}
}
} finally {
evictionLock.unlock();
}
// EvictionLock is released. Close the evicted files one by one.
// The closes are sync in the closeFile method.
for (CachedMobFile evictedFile : evictedFiles) {
closeFile(evictedFile);
}
}
}
/**
* Evicts the cached file by the name.
* @param fileName The name of a cached file.
*/
public void evictFile(String fileName) {
if (isCacheEnabled) {
IdLock.Entry lockEntry = null;
try {
// obtains the lock to close the cached file.
lockEntry = keyLock.getLockEntry(fileName.hashCode());
CachedMobFile evictedFile = map.remove(fileName);
if (evictedFile != null) {
evictedFile.close();
}
} catch (IOException e) {
LOG.error("Fail to evict the file " + fileName, e);
} finally {
if (lockEntry != null) {
keyLock.releaseLockEntry(lockEntry);
}
}
}
}
/**
* Opens a mob file.
* @param fs The current file system.
* @param path The file path.
* @param cacheConf The current MobCacheConfig
* @return A opened mob file.
* @throws IOException
*/
public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException {
if (!isCacheEnabled) {
return MobFile.create(fs, path, conf, cacheConf);
} else {
String fileName = path.getName();
CachedMobFile cached = map.get(fileName);
IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode());
try {
if (cached == null) {
cached = map.get(fileName);
if (cached == null) {
if (map.size() > mobFileMaxCacheSize) {
evict();
}
cached = CachedMobFile.create(fs, path, conf, cacheConf);
cached.open();
map.put(fileName, cached);
miss.incrementAndGet();
}
}
cached.open();
cached.access(count.incrementAndGet());
} finally {
keyLock.releaseLockEntry(lockEntry);
}
return cached;
}
}
/**
* Closes a mob file.
* @param file The mob file that needs to be closed.
*/
public void closeFile(MobFile file) {
IdLock.Entry lockEntry = null;
try {
if (!isCacheEnabled) {
file.close();
} else {
lockEntry = keyLock.getLockEntry(file.getFileName().hashCode());
file.close();
}
} catch (IOException e) {
LOG.error("MobFileCache, Exception happen during close " + file.getFileName(), e);
} finally {
if (lockEntry != null) {
keyLock.releaseLockEntry(lockEntry);
}
}
}
/**
* Gets the count of cached mob files.
* @return The count of the cached mob files.
*/
public int getCacheSize() {
return map == null ? 0 : map.size();
}
/**
* Prints the statistics.
*/
public void printStatistics() {
long access = count.get() - lastAccess;
long missed = miss.get() - lastMiss;
long hitRate = (access - missed) * 100 / access;
LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: "
+ (access - missed) + ", hit rate: "
+ ((access == 0) ? 0 : hitRate) + "%");
lastAccess += access;
lastMiss += missed;
}
}

View File

@ -0,0 +1,169 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.MD5Hash;
/**
* The mob file name.
* It consists of a md5 of a start key, a date and an uuid.
* It looks like md5(start) + date + uuid.
* <ol>
* <li>0-31 characters: md5 hex string of a start key. Since the length of the start key is not
* fixed, have to use the md5 instead which has a fix length.</li>
* <li>32-39 characters: a string of a date with format yyyymmdd. The date is the latest timestamp
* of cells in this file</li>
* <li>the remaining characters: the uuid.</li>
* </ol>
* Using md5 hex string of start key as the prefix of file name makes files with the same start
* key unique, they're different from the ones with other start keys
* The cells come from different regions might be in the same mob file by region split,
* this is allowed.
* Has the latest timestamp of cells in the file name in order to clean the expired mob files by
* TTL easily. If this timestamp is older than the TTL, it's regarded as expired.
*/
@InterfaceAudience.Private
public class MobFileName {
private final String date;
private final String startKey;
private final String uuid;
private final String fileName;
/**
* @param startKey
* The start key.
* @param date
* The string of the latest timestamp of cells in this file, the format is yyyymmdd.
* @param uuid
* The uuid
*/
private MobFileName(byte[] startKey, String date, String uuid) {
this.startKey = MD5Hash.getMD5AsHex(startKey, 0, startKey.length);
this.uuid = uuid;
this.date = date;
this.fileName = this.startKey + date + uuid;
}
/**
* @param startKey
* The md5 hex string of the start key.
* @param date
* The string of the latest timestamp of cells in this file, the format is yyyymmdd.
* @param uuid
* The uuid
*/
private MobFileName(String startKey, String date, String uuid) {
this.startKey = startKey;
this.uuid = uuid;
this.date = date;
this.fileName = this.startKey + date + uuid;
}
/**
* Creates an instance of MobFileName
*
* @param startKey
* The start key.
* @param date
* The string of the latest timestamp of cells in this file, the format is yyyymmdd.
* @param uuid The uuid.
* @return An instance of a MobFileName.
*/
public static MobFileName create(byte[] startKey, String date, String uuid) {
return new MobFileName(startKey, date, uuid);
}
/**
* Creates an instance of MobFileName
*
* @param startKey
* The md5 hex string of the start key.
* @param date
* The string of the latest timestamp of cells in this file, the format is yyyymmdd.
* @param uuid The uuid.
* @return An instance of a MobFileName.
*/
public static MobFileName create(String startKey, String date, String uuid) {
return new MobFileName(startKey, date, uuid);
}
/**
* Creates an instance of MobFileName.
* @param fileName The string format of a file name.
* @return An instance of a MobFileName.
*/
public static MobFileName create(String fileName) {
// The format of a file name is md5HexString(0-31bytes) + date(32-39bytes) + UUID
// The date format is yyyyMMdd
String startKey = fileName.substring(0, 32);
String date = fileName.substring(32, 40);
String uuid = fileName.substring(40);
return new MobFileName(startKey, date, uuid);
}
/**
* Gets the hex string of the md5 for a start key.
* @return The hex string of the md5 for a start key.
*/
public String getStartKey() {
return startKey;
}
/**
* Gets the date string. Its format is yyyymmdd.
* @return The date string.
*/
public String getDate() {
return this.date;
}
@Override
public int hashCode() {
StringBuilder builder = new StringBuilder();
builder.append(startKey);
builder.append(date);
builder.append(uuid);
return builder.toString().hashCode();
}
@Override
public boolean equals(Object anObject) {
if (this == anObject) {
return true;
}
if (anObject instanceof MobFileName) {
MobFileName another = (MobFileName) anObject;
if (this.startKey.equals(another.startKey) && this.date.equals(another.date)
&& this.uuid.equals(another.uuid)) {
return true;
}
}
return false;
}
/**
* Gets the file name.
* @return The file name.
*/
public String getFileName() {
return this.fileName;
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.Store;
/**
* MobStoreEngine creates the mob specific compactor, and store flusher.
*/
@InterfaceAudience.Private
public class MobStoreEngine extends DefaultStoreEngine {
@Override
protected void createStoreFlusher(Configuration conf, Store store) throws IOException {
// When using MOB, we use DefaultMobStoreFlusher always
// Just use the compactor and compaction policy as that in DefaultStoreEngine. We can have MOB
// specific compactor and policy when that is implemented.
storeFlusher = new DefaultMobStoreFlusher(conf, store);
}
}

View File

@ -0,0 +1,263 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
/**
* The mob utilities
*/
@InterfaceAudience.Private
public class MobUtils {
private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyyMMdd");
}
};
/**
* Indicates whether the column family is a mob one.
* @param hcd The descriptor of a column family.
* @return True if this column family is a mob one, false if it's not.
*/
public static boolean isMobFamily(HColumnDescriptor hcd) {
byte[] isMob = hcd.getValue(MobConstants.IS_MOB);
return isMob != null && isMob.length == 1 && Bytes.toBoolean(isMob);
}
/**
* Gets the mob threshold.
* If the size of a cell value is larger than this threshold, it's regarded as a mob.
* The default threshold is 1024*100(100K)B.
* @param hcd The descriptor of a column family.
* @return The threshold.
*/
public static long getMobThreshold(HColumnDescriptor hcd) {
byte[] threshold = hcd.getValue(MobConstants.MOB_THRESHOLD);
return threshold != null && threshold.length == Bytes.SIZEOF_LONG ? Bytes.toLong(threshold)
: MobConstants.DEFAULT_MOB_THRESHOLD;
}
/**
* Formats a date to a string.
* @param date The date.
* @return The string format of the date, it's yyyymmdd.
*/
public static String formatDate(Date date) {
return LOCAL_FORMAT.get().format(date);
}
/**
* Parses the string to a date.
* @param dateString The string format of a date, it's yyyymmdd.
* @return A date.
* @throws ParseException
*/
public static Date parseDate(String dateString) throws ParseException {
return LOCAL_FORMAT.get().parse(dateString);
}
/**
* Whether the current cell is a mob reference cell.
* @param cell The current cell.
* @return True if the cell has a mob reference tag, false if it doesn't.
*/
public static boolean isMobReferenceCell(Cell cell) {
if (cell.getTagsLength() > 0) {
Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
TagType.MOB_REFERENCE_TAG_TYPE);
return tag != null;
}
return false;
}
/**
* Whether the tag list has a mob reference tag.
* @param tags The tag list.
* @return True if the list has a mob reference tag, false if it doesn't.
*/
public static boolean hasMobReferenceTag(List<Tag> tags) {
if (!tags.isEmpty()) {
for (Tag tag : tags) {
if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
return true;
}
}
}
return false;
}
/**
* Indicates whether it's a raw scan.
* The information is set in the attribute "hbase.mob.scan.raw" of scan.
* For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
* In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
* the mob file.
* @param scan The current scan.
* @return True if it's a raw scan.
*/
public static boolean isRawMobScan(Scan scan) {
byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
try {
return raw != null && Bytes.toBoolean(raw);
} catch (IllegalArgumentException e) {
return false;
}
}
/**
* Indicates whether the scan contains the information of caching blocks.
* The information is set in the attribute "hbase.mob.cache.blocks" of scan.
* @param scan The current scan.
* @return True when the Scan attribute specifies to cache the MOB blocks.
*/
public static boolean isCacheMobBlocks(Scan scan) {
byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
try {
return cache != null && Bytes.toBoolean(cache);
} catch (IllegalArgumentException e) {
return false;
}
}
/**
* Sets the attribute of caching blocks in the scan.
*
* @param scan
* The current scan.
* @param cacheBlocks
* True, set the attribute of caching blocks into the scan, the scanner with this scan
* caches blocks.
* False, the scanner doesn't cache blocks for this scan.
*/
public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
}
/**
* Gets the root dir of the mob files.
* It's {HBASE_DIR}/mobdir.
* @param conf The current configuration.
* @return the root dir of the mob file.
*/
public static Path getMobHome(Configuration conf) {
Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
return new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
}
/**
* Gets the region dir of the mob files.
* It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
* @param conf The current configuration.
* @param tableName The current table name.
* @return The region dir of the mob files.
*/
public static Path getMobRegionPath(Configuration conf, TableName tableName) {
Path tablePath = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
HRegionInfo regionInfo = getMobRegionInfo(tableName);
return new Path(tablePath, regionInfo.getEncodedName());
}
/**
* Gets the family dir of the mob files.
* It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
* @param conf The current configuration.
* @param tableName The current table name.
* @param familyName The current family name.
* @return The family dir of the mob files.
*/
public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
return new Path(getMobRegionPath(conf, tableName), familyName);
}
/**
* Gets the family dir of the mob files.
* It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
* @param regionPath The path of mob region which is a dummy one.
* @param familyName The current family name.
* @return The family dir of the mob files.
*/
public static Path getMobFamilyPath(Path regionPath, String familyName) {
return new Path(regionPath, familyName);
}
/**
* Gets the HRegionInfo of the mob files.
* This is a dummy region. The mob files are not saved in a region in HBase.
* This is only used in mob snapshot. It's internally used only.
* @param tableName
* @return A dummy mob region info.
*/
public static HRegionInfo getMobRegionInfo(TableName tableName) {
HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES,
HConstants.EMPTY_END_ROW, false, 0);
return info;
}
/**
* Creates a mob reference KeyValue.
* The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
* @param kv The original KeyValue.
* @param fileName The mob file name where the mob reference KeyValue is written.
* @param tableNameTag The tag of the current table name. It's very important in
* cloning the snapshot.
* @return The mob reference KeyValue.
*/
public static KeyValue createMobRefKeyValue(KeyValue kv, byte[] fileName, Tag tableNameTag) {
// Append the tags to the KeyValue.
// The key is same, the value is the filename of the mob file
List<Tag> existingTags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
existingTags.add(MobConstants.MOB_REF_TAG);
// Add the tag of the source table name, this table is where this mob file is flushed
// from.
// It's very useful in cloning the snapshot. When reading from the cloning table, we need to
// find the original mob files by this table name. For details please see cloning
// snapshot for mob files.
existingTags.add(tableNameTag);
long valueLength = kv.getValueLength();
byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put,
refValue, 0, refValue.length, existingTags);
reference.setSequenceId(kv.getSequenceId());
return reference;
}
}

View File

@ -63,6 +63,12 @@ public class DefaultStoreEngine extends StoreEngine<
protected void createComponents(
Configuration conf, Store store, KVComparator kvComparator) throws IOException {
storeFileManager = new DefaultStoreFileManager(kvComparator, conf);
createCompactor(conf, store);
createCompactionPolicy(conf, store);
createStoreFlusher(conf, store);
}
protected void createCompactor(Configuration conf, Store store) throws IOException {
String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName());
try {
compactor = ReflectionUtils.instantiateWithCustomCtor(className,
@ -70,7 +76,10 @@ public class DefaultStoreEngine extends StoreEngine<
} catch (Exception e) {
throw new IOException("Unable to load configured compactor '" + className + "'", e);
}
className = conf.get(
}
protected void createCompactionPolicy(Configuration conf, Store store) throws IOException {
String className = conf.get(
DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName());
try {
compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className,
@ -79,7 +88,10 @@ public class DefaultStoreEngine extends StoreEngine<
} catch (Exception e) {
throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
}
className = conf.get(
}
protected void createStoreFlusher(Configuration conf, Store store) throws IOException {
String className = conf.get(
DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName());
try {
storeFlusher = ReflectionUtils.instantiateWithCustomCtor(className,

View File

@ -0,0 +1,268 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Date;
import java.util.NavigableSet;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobCacheConfig;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFile;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobStoreEngine;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.Bytes;
/**
* The store implementation to save MOBs (medium objects), it extends the HStore.
* When a descriptor of a column family has the value "is_mob", it means this column family
* is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is
* created.
* HMobStore is almost the same with the HStore except using different types of scanners.
* In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned.
* In these scanners, a additional seeks in the mob files should be performed after the seek
* to HBase is done.
* The store implements how we save MOBs by extending HStore. When a descriptor
* of a column family has the value "isMob", it means this column family is a mob one. When a
* HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is
* almost the same with the HStore except using different types of scanners. In the method of
* getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a
* additional seeks in the mob files should be performed after the seek in HBase is done.
*/
@InterfaceAudience.Private
public class HMobStore extends HStore {
private MobCacheConfig mobCacheConfig;
private Path homePath;
private Path mobFamilyPath;
public HMobStore(final HRegion region, final HColumnDescriptor family,
final Configuration confParam) throws IOException {
super(region, family, confParam);
this.mobCacheConfig = (MobCacheConfig) cacheConf;
this.homePath = MobUtils.getMobHome(conf);
this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
family.getNameAsString());
}
/**
* Creates the mob cache config.
*/
@Override
protected void createCacheConf(HColumnDescriptor family) {
cacheConf = new MobCacheConfig(conf, family);
}
/**
* Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
* the mob files should be performed after the seek in HBase is done.
*/
@Override
protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
long readPt, KeyValueScanner scanner) throws IOException {
if (scanner == null) {
scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
}
return scanner;
}
/**
* Creates the mob store engine.
*/
@Override
protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
KVComparator kvComparator) throws IOException {
MobStoreEngine engine = new MobStoreEngine();
engine.createComponents(conf, store, kvComparator);
return engine;
}
/**
* Gets the temp directory.
* @return The temp directory.
*/
private Path getTempDir() {
return new Path(homePath, MobConstants.TEMP_DIR_NAME);
}
/**
* Creates the temp directory of mob files for flushing.
* @param date The latest date of cells in the flushing.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @return The writer for the mob file.
* @throws IOException
*/
public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey) throws IOException {
if (startKey == null) {
startKey = HConstants.EMPTY_START_ROW;
}
Path path = getTempDir();
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
}
/**
* Creates the temp directory of mob files for flushing.
* @param date The date string, its format is yyyymmmdd.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @return The writer for the mob file.
* @throws IOException
*/
public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey) throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
.toString().replaceAll("-", ""));
final CacheConfig writerCacheConf = mobCacheConfig;
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
.withIncludesMvcc(false).withIncludesTags(true)
.withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
.withBlockSize(getFamily().getBlocksize())
.withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()).build();
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
.withFilePath(new Path(basePath, mobFileName.getFileName()))
.withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
return w;
}
/**
* Commits the mob file.
* @param sourceFile The source file.
* @param targetPath The directory path where the source file is renamed to.
* @throws IOException
*/
public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
if (sourceFile == null) {
return;
}
Path dstPath = new Path(targetPath, sourceFile.getName());
validateMobFile(sourceFile);
String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
LOG.info(msg);
Path parent = dstPath.getParent();
if (!region.getFilesystem().exists(parent)) {
region.getFilesystem().mkdirs(parent);
}
if (!region.getFilesystem().rename(sourceFile, dstPath)) {
throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
}
}
/**
* Validates a mob file by opening and closing it.
*
* @param path the path to the mob file
*/
private void validateMobFile(Path path) throws IOException {
StoreFile storeFile = null;
try {
storeFile =
new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
storeFile.createReader();
} catch (IOException e) {
LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
throw e;
} finally {
if (storeFile != null) {
storeFile.closeReader(false);
}
}
}
/**
* Reads the cell from the mob file.
* @param reference The cell found in the HBase, its value is a path to a mob file.
* @param cacheBlocks Whether the scanner should cache blocks.
* @return The cell found in the mob file.
* @throws IOException
*/
public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
Cell result = null;
if (reference.getValueLength() > Bytes.SIZEOF_LONG) {
String fileName = Bytes.toString(reference.getValueArray(), reference.getValueOffset()
+ Bytes.SIZEOF_LONG, reference.getValueLength() - Bytes.SIZEOF_LONG);
Path targetPath = new Path(mobFamilyPath, fileName);
MobFile file = null;
try {
file = mobCacheConfig.getMobFileCache().openFile(region.getFilesystem(), targetPath,
mobCacheConfig);
result = file.readCell(reference, cacheBlocks);
} catch (IOException e) {
LOG.error("Fail to open/read the mob file " + targetPath.toString(), e);
} catch (NullPointerException e) {
// When delete the file during the scan, the hdfs getBlockRange will
// throw NullPointerException, catch it and manage it.
LOG.error("Fail to read the mob file " + targetPath.toString()
+ " since it's already deleted", e);
} finally {
if (file != null) {
mobCacheConfig.getMobFileCache().closeFile(file);
}
}
} else {
LOG.warn("Invalid reference to mob, " + reference.getValueLength() + " bytes is too short");
}
if (result == null) {
LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
+ "qualifier,timestamp,type and tags but with an empty value to return.");
result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(),
reference.getFamilyLength(), reference.getQualifierArray(),
reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(),
Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY,
0, 0, reference.getTagsArray(), reference.getTagsOffset(),
reference.getTagsLength());
}
return result;
}
/**
* Gets the mob file path.
* @return The mob file path.
*/
public Path getPath() {
return mobFamilyPath;
}
}

View File

@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -3552,6 +3553,9 @@ public class HRegion implements HeapSize { // , Writable{
}
protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
if (MobUtils.isMobFamily(family)) {
return new HMobStore(this, family, this.conf);
}
return new HStore(this, family, this.conf);
}

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
@ -132,11 +133,11 @@ public class HStore implements Store {
protected final MemStore memstore;
// This stores directory in the filesystem.
private final HRegion region;
protected final HRegion region;
private final HColumnDescriptor family;
private final HRegionFileSystem fs;
private final Configuration conf;
private final CacheConfig cacheConf;
protected final Configuration conf;
protected CacheConfig cacheConf;
private long lastCompactSize = 0;
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
@ -244,7 +245,7 @@ public class HStore implements Store {
this.offPeakHours = OffPeakHours.getInstance(conf);
// Setting up cache configuration for this family
this.cacheConf = new CacheConfig(conf, family);
createCacheConf(family);
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
@ -263,7 +264,7 @@ public class HStore implements Store {
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
}
this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
@ -337,6 +338,27 @@ public class HStore implements Store {
}
}
/**
* Creates the cache config.
* @param family The current column family.
*/
protected void createCacheConf(final HColumnDescriptor family) {
this.cacheConf = new CacheConfig(conf, family);
}
/**
* Creates the store engine configured for the given Store.
* @param store The store. An unfortunate dependency needed due to it
* being passed to coprocessors via the compactor.
* @param conf Store configuration.
* @param kvComparator KVComparator for storeFileManager.
* @return StoreEngine to use.
*/
protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
KVComparator kvComparator) throws IOException {
return StoreEngine.create(store, conf, comparator);
}
/**
* @param family
* @return TTL in seconds of the specified family
@ -1886,17 +1908,23 @@ public class HStore implements Store {
if (this.getCoprocessorHost() != null) {
scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
}
if (scanner == null) {
scanner = scan.isReversed() ? new ReversedStoreScanner(this,
getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
getScanInfo(), scan, targetCols, readPt);
}
scanner = createScanner(scan, targetCols, readPt, scanner);
return scanner;
} finally {
lock.readLock().unlock();
}
}
protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
long readPt, KeyValueScanner scanner) throws IOException {
if (scanner == null) {
scanner = scan.isReversed() ? new ReversedStoreScanner(this,
getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
getScanInfo(), scan, targetCols, readPt);
}
return scanner;
}
@Override
public String toString() {
return this.getColumnFamilyName();

View File

@ -0,0 +1,69 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobUtils;
/**
* Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into List<KeyValue>
* for a single row.
*
*/
@InterfaceAudience.Private
public class MobStoreScanner extends StoreScanner {
private boolean cacheMobBlocks = false;
public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
final NavigableSet<byte[]> columns, long readPt) throws IOException {
super(store, scanInfo, scan, columns, readPt);
cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
}
/**
* Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
* reference tag), the scanner need seek this cell from the mob file, and use the cell found
* from the mob file as the result.
*/
@Override
public boolean next(List<Cell> outResult, int limit) throws IOException {
boolean result = super.next(outResult, limit);
if (!MobUtils.isRawMobScan(scan)) {
// retrieve the mob data
if (outResult.isEmpty()) {
return result;
}
HMobStore mobStore = (HMobStore) store;
for (int i = 0; i < outResult.size(); i++) {
Cell cell = outResult.get(i);
if (MobUtils.isMobReferenceCell(cell)) {
outResult.set(i, mobStore.resolve(cell, cacheMobBlocks));
}
}
}
return result;
}
}

View File

@ -0,0 +1,69 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobUtils;
/**
* ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
* reversed scanning in both the memstore and the MOB store.
*
*/
@InterfaceAudience.Private
public class ReversedMobStoreScanner extends ReversedStoreScanner {
private boolean cacheMobBlocks = false;
ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
super(store, scanInfo, scan, columns, readPt);
cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
}
/**
* Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
* reference tag), the scanner need seek this cell from the mob file, and use the cell found
* from the mob file as the result.
*/
@Override
public boolean next(List<Cell> outResult, int limit) throws IOException {
boolean result = super.next(outResult, limit);
if (!MobUtils.isRawMobScan(scan)) {
// retrieve the mob data
if (outResult.isEmpty()) {
return result;
}
HMobStore mobStore = (HMobStore) store;
for (int i = 0; i < outResult.size(); i++) {
Cell cell = outResult.get(i);
if (MobUtils.isMobReferenceCell(cell)) {
outResult.set(i, mobStore.resolve(cell, cacheMobBlocks));
}
}
}
return result;
}
}

View File

@ -915,7 +915,7 @@ public class StoreFile {
return this.writer.getPath();
}
boolean hasGeneralBloom() {
public boolean hasGeneralBloom() {
return this.generalBloomFilterWriter != null;
}

View File

@ -0,0 +1,86 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
public class MobTestUtil {
protected static final char FIRST_CHAR = 'a';
protected static final char LAST_CHAR = 'z';
protected static String generateRandomString(int demoLength) {
String base = "abcdefghijklmnopqrstuvwxyz";
Random random = new Random();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < demoLength; i++) {
int number = random.nextInt(base.length());
sb.append(base.charAt(number));
}
return sb.toString();
}
protected static void writeStoreFile(final StoreFile.Writer writer, String caseName)
throws IOException {
writeStoreFile(writer, Bytes.toBytes(caseName), Bytes.toBytes(caseName));
}
/*
* Writes HStoreKey and ImmutableBytes data to passed writer and then closes
* it.
*
* @param writer
*
* @throws IOException
*/
private static void writeStoreFile(final StoreFile.Writer writer, byte[] fam,
byte[] qualifier) throws IOException {
long now = System.currentTimeMillis();
try {
for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
byte[] b = new byte[] { (byte) d, (byte) e };
writer.append(new KeyValue(b, fam, qualifier, now, b));
}
}
} finally {
writer.close();
}
}
/**
* Compare two KeyValue only for their row family qualifier value
*/
public static void assertKeyValuesEquals(KeyValue firstKeyValue,
KeyValue secondKeyValue) {
Assert.assertEquals(Bytes.toString(CellUtil.cloneRow(firstKeyValue)),
Bytes.toString(CellUtil.cloneRow(secondKeyValue)));
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(firstKeyValue)),
Bytes.toString(CellUtil.cloneFamily(secondKeyValue)));
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(firstKeyValue)),
Bytes.toString(CellUtil.cloneQualifier(secondKeyValue)));
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(firstKeyValue)),
Bytes.toString(CellUtil.cloneValue(secondKeyValue)));
}
}

View File

@ -0,0 +1,154 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestCachedMobFile extends TestCase{
static final Log LOG = LogFactory.getLog(TestCachedMobFile.class);
private Configuration conf = HBaseConfiguration.create();
private CacheConfig cacheConf = new CacheConfig(conf);
private final String TABLE = "tableName";
private final String FAMILY = "familyName";
private final String FAMILY1 = "familyName1";
private final String FAMILY2 = "familyName2";
private final long EXPECTED_REFERENCE_ZERO = 0;
private final long EXPECTED_REFERENCE_ONE = 1;
private final long EXPECTED_REFERENCE_TWO = 2;
@Test
public void testOpenClose() throws Exception {
String caseName = getName();
FileSystem fs = FileSystem.get(conf);
Path testDir = FSUtils.getRootDir(conf);
Path outputDir = new Path(new Path(testDir, TABLE),
FAMILY);
HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
.withOutputDir(outputDir).withFileContext(meta).build();
MobTestUtil.writeStoreFile(writer, caseName);
CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
Assert.assertEquals(EXPECTED_REFERENCE_ZERO, cachedMobFile.getReferenceCount());
cachedMobFile.open();
Assert.assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile.getReferenceCount());
cachedMobFile.open();
Assert.assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile.getReferenceCount());
cachedMobFile.close();
Assert.assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile.getReferenceCount());
cachedMobFile.close();
Assert.assertEquals(EXPECTED_REFERENCE_ZERO, cachedMobFile.getReferenceCount());
}
@Test
public void testCompare() throws Exception {
String caseName = getName();
FileSystem fs = FileSystem.get(conf);
Path testDir = FSUtils.getRootDir(conf);
Path outputDir1 = new Path(new Path(testDir, TABLE),
FAMILY1);
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
StoreFile.Writer writer1 = new StoreFile.WriterBuilder(conf, cacheConf, fs)
.withOutputDir(outputDir1).withFileContext(meta).build();
MobTestUtil.writeStoreFile(writer1, caseName);
CachedMobFile cachedMobFile1 = CachedMobFile.create(fs, writer1.getPath(), conf, cacheConf);
Path outputDir2 = new Path(new Path(testDir, TABLE),
FAMILY2);
StoreFile.Writer writer2 = new StoreFile.WriterBuilder(conf, cacheConf, fs)
.withOutputDir(outputDir2)
.withFileContext(meta)
.build();
MobTestUtil.writeStoreFile(writer2, caseName);
CachedMobFile cachedMobFile2 = CachedMobFile.create(fs, writer2.getPath(), conf, cacheConf);
cachedMobFile1.access(1);
cachedMobFile2.access(2);
Assert.assertEquals(cachedMobFile1.compareTo(cachedMobFile2), 1);
Assert.assertEquals(cachedMobFile2.compareTo(cachedMobFile1), -1);
Assert.assertEquals(cachedMobFile1.compareTo(cachedMobFile1), 0);
}
@Test
public void testReadKeyValue() throws Exception {
FileSystem fs = FileSystem.get(conf);
Path testDir = FSUtils.getRootDir(conf);
Path outputDir = new Path(new Path(testDir, TABLE), "familyname");
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
.withOutputDir(outputDir).withFileContext(meta).build();
String caseName = getName();
MobTestUtil.writeStoreFile(writer, caseName);
CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
byte[] family = Bytes.toBytes(caseName);
byte[] qualify = Bytes.toBytes(caseName);
// Test the start key
byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
KeyValue expectedKey =
new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
KeyValue seekKey = expectedKey.createKeyOnly(false);
KeyValue kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
// Test the end key
byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
seekKey = expectedKey.createKeyOnly(false);
kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
// Test the random key
byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
seekKey = expectedKey.createKeyOnly(false);
kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
// Test the key which is less than the start key
byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
// Test the key which is more than the end key
byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
Assert.assertNull(kv);
}
}

View File

@ -0,0 +1,193 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestDefaultMobStoreFlusher {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] row1 = Bytes.toBytes("row1");
private final static byte [] row2 = Bytes.toBytes("row2");
private final static byte [] family = Bytes.toBytes("family");
private final static byte [] qf1 = Bytes.toBytes("qf1");
private final static byte [] qf2 = Bytes.toBytes("qf2");
private final static byte [] value1 = Bytes.toBytes("value1");
private final static byte [] value2 = Bytes.toBytes("value2");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testFlushNonMobFile() throws InterruptedException {
String TN = "testFlushNonMobFile";
HTable table = null;
HBaseAdmin admin = null;
try {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TN));
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMaxVersions(4);
desc.addFamily(hcd);
admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
admin.createTable(desc);
table = new HTable(TEST_UTIL.getConfiguration(), TN);
//Put data
Put put0 = new Put(row1);
put0.add(family, qf1, 1, value1);
table.put(put0);
//Put more data
Put put1 = new Put(row2);
put1.add(family, qf2, 1, value2);
table.put(put1);
//Flush
table.flushCommits();
admin.flush(TN);
Scan scan = new Scan();
scan.addColumn(family, qf1);
scan.setMaxVersions(4);
ResultScanner scanner = table.getScanner(scan);
//Compare
Result result = scanner.next();
int size = 0;
while (result != null) {
size++;
List<Cell> cells = result.getColumnCells(family, qf1);
// Verify the cell size
Assert.assertEquals(1, cells.size());
// Verify the value
Assert.assertEquals(Bytes.toString(value1),
Bytes.toString(CellUtil.cloneValue(cells.get(0))));
result = scanner.next();
}
scanner.close();
Assert.assertEquals(1, size);
admin.close();
} catch (MasterNotRunningException e1) {
e1.printStackTrace();
} catch (ZooKeeperConnectionException e2) {
e2.printStackTrace();
} catch (IOException e3) {
e3.printStackTrace();
}
}
@Test
public void testFlushMobFile() throws InterruptedException {
String TN = "testFlushMobFile";
HTable table = null;
HBaseAdmin admin = null;
try {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TN));
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
hcd.setMaxVersions(4);
desc.addFamily(hcd);
admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
admin.createTable(desc);
table = new HTable(TEST_UTIL.getConfiguration(), TN);
//put data
Put put0 = new Put(row1);
put0.add(family, qf1, 1, value1);
table.put(put0);
//put more data
Put put1 = new Put(row2);
put1.add(family, qf2, 1, value2);
table.put(put1);
//flush
table.flushCommits();
admin.flush(TN);
//Scan
Scan scan = new Scan();
scan.addColumn(family, qf1);
scan.setMaxVersions(4);
ResultScanner scanner = table.getScanner(scan);
//Compare
Result result = scanner.next();
int size = 0;
while (result != null) {
size++;
List<Cell> cells = result.getColumnCells(family, qf1);
// Verify the the cell size
Assert.assertEquals(1, cells.size());
// Verify the value
Assert.assertEquals(Bytes.toString(value1),
Bytes.toString(CellUtil.cloneValue(cells.get(0))));
result = scanner.next();
}
scanner.close();
Assert.assertEquals(1, size);
admin.close();
} catch (MasterNotRunningException e1) {
e1.printStackTrace();
} catch (ZooKeeperConnectionException e2) {
e2.printStackTrace();
} catch (IOException e3) {
e3.printStackTrace();
}
}
}

View File

@ -0,0 +1,141 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestMobDataBlockEncoding {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] row1 = Bytes.toBytes("row1");
private final static byte [] family = Bytes.toBytes("family");
private final static byte [] qf1 = Bytes.toBytes("qualifier1");
private final static byte [] qf2 = Bytes.toBytes("qualifier2");
protected final byte[] qf3 = Bytes.toBytes("qualifier3");
private static HTable table;
private static HBaseAdmin admin;
private static HColumnDescriptor hcd;
private static HTableDescriptor desc;
private static Random random = new Random();
private static long defaultThreshold = 10;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
public void setUp(long threshold, String TN, DataBlockEncoding encoding)
throws Exception {
desc = new HTableDescriptor(TableName.valueOf(TN));
hcd = new HColumnDescriptor(family);
hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(threshold));
hcd.setMaxVersions(4);
hcd.setDataBlockEncoding(encoding);
desc.addFamily(hcd);
admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
admin.createTable(desc);
table = new HTable(TEST_UTIL.getConfiguration(), TN);
}
/**
* Generate the mob value.
*
* @param size the size of the value
* @return the mob value generated
*/
private static byte[] generateMobValue(int size) {
byte[] mobVal = new byte[size];
random.nextBytes(mobVal);
return mobVal;
}
@Test
public void testDataBlockEncoding() throws Exception {
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
testDataBlockEncoding(encoding);
}
}
public void testDataBlockEncoding(DataBlockEncoding encoding) throws Exception {
String TN = "testDataBlockEncoding" + encoding;
setUp(defaultThreshold, TN, encoding);
long ts1 = System.currentTimeMillis();
long ts2 = ts1 + 1;
long ts3 = ts1 + 2;
byte[] value = generateMobValue((int) defaultThreshold + 1);
Put put1 = new Put(row1);
put1.add(family, qf1, ts3, value);
put1.add(family, qf2, ts2, value);
put1.add(family, qf3, ts1, value);
table.put(put1);
table.flushCommits();
admin.flush(TN);
Scan scan = new Scan();
scan.setMaxVersions(4);
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
List<Cell> cells = res.listCells();
for(Cell cell : cells) {
// Verify the value
Assert.assertEquals(Bytes.toString(value),
Bytes.toString(CellUtil.cloneValue(cell)));
count++;
}
}
results.close();
Assert.assertEquals(3, count);
}
}

View File

@ -0,0 +1,124 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestMobFile extends TestCase {
static final Log LOG = LogFactory.getLog(TestMobFile.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private Configuration conf = TEST_UTIL.getConfiguration();
private CacheConfig cacheConf = new CacheConfig(conf);
private final String TABLE = "tableName";
private final String FAMILY = "familyName";
@Test
public void testReadKeyValue() throws Exception {
FileSystem fs = FileSystem.get(conf);
Path testDir = FSUtils.getRootDir(conf);
Path outputDir = new Path(new Path(testDir, TABLE), FAMILY);
HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
.withOutputDir(outputDir)
.withFileContext(meta)
.build();
String caseName = getName();
MobTestUtil.writeStoreFile(writer, caseName);
MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(),
conf, cacheConf, BloomType.NONE));
byte[] family = Bytes.toBytes(caseName);
byte[] qualify = Bytes.toBytes(caseName);
// Test the start key
byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
KeyValue expectedKey =
new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
KeyValue seekKey = expectedKey.createKeyOnly(false);
KeyValue kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
// Test the end key
byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
seekKey = expectedKey.createKeyOnly(false);
kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
// Test the random key
byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
seekKey = expectedKey.createKeyOnly(false);
kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
// Test the key which is less than the start key
byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
// Test the key which is more than the end key
byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
assertNull(kv);
}
@Test
public void testGetScanner() throws Exception {
FileSystem fs = FileSystem.get(conf);
Path testDir = FSUtils.getRootDir(conf);
Path outputDir = new Path(new Path(testDir, TABLE), FAMILY);
HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
.withOutputDir(outputDir)
.withFileContext(meta)
.build();
MobTestUtil.writeStoreFile(writer, getName());
MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(),
conf, cacheConf, BloomType.NONE));
assertNotNull(mobFile.getScanner());
assertTrue(mobFile.getScanner() instanceof StoreFileScanner);
}
}

View File

@ -0,0 +1,206 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.Date;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestMobFileCache extends TestCase {
static final Log LOG = LogFactory.getLog(TestMobFileCache.class);
private HBaseTestingUtility UTIL;
private HRegion region;
private Configuration conf;
private MobCacheConfig mobCacheConf;
private MobFileCache mobFileCache;
private Date currentDate = new Date();
private final String TEST_CACHE_SIZE = "2";
private final int EXPECTED_CACHE_SIZE_ZERO = 0;
private final int EXPECTED_CACHE_SIZE_ONE = 1;
private final int EXPECTED_CACHE_SIZE_TWO = 2;
private final int EXPECTED_CACHE_SIZE_THREE = 3;
private final long EXPECTED_REFERENCE_ONE = 1;
private final long EXPECTED_REFERENCE_TWO = 2;
private final String TABLE = "tableName";
private final String FAMILY1 = "family1";
private final String FAMILY2 = "family2";
private final String FAMILY3 = "family3";
private final byte[] ROW = Bytes.toBytes("row");
private final byte[] ROW2 = Bytes.toBytes("row2");
private final byte[] VALUE = Bytes.toBytes("value");
private final byte[] VALUE2 = Bytes.toBytes("value2");
private final byte[] QF1 = Bytes.toBytes("qf1");
private final byte[] QF2 = Bytes.toBytes("qf2");
private final byte[] QF3 = Bytes.toBytes("qf3");
@Override
public void setUp() throws Exception {
UTIL = HBaseTestingUtility.createLocalHTU();
conf = UTIL.getConfiguration();
HTableDescriptor htd = UTIL.createTableDescriptor("testMobFileCache");
HColumnDescriptor hcd1 = new HColumnDescriptor(FAMILY1);
hcd1.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
hcd1.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
HColumnDescriptor hcd2 = new HColumnDescriptor(FAMILY2);
hcd2.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
hcd2.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
HColumnDescriptor hcd3 = new HColumnDescriptor(FAMILY3);
hcd3.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
hcd3.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
htd.addFamily(hcd1);
htd.addFamily(hcd2);
htd.addFamily(hcd3);
region = UTIL.createLocalHRegion(htd, null, null);
}
@Override
protected void tearDown() throws Exception {
region.close();
region.getFilesystem().delete(UTIL.getDataTestDir(), true);
}
/**
* Create the mob store file.
* @param family
*/
private Path createMobStoreFile(String family) throws IOException {
return createMobStoreFile(HBaseConfiguration.create(), family);
}
/**
* Create the mob store file
* @param conf
* @param family
*/
private Path createMobStoreFile(Configuration conf, String family) throws IOException {
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMaxVersions(4);
hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
mobCacheConf = new MobCacheConfig(conf, hcd);
return createMobStoreFile(conf, hcd);
}
/**
* Create the mob store file
* @param conf
* @param hcd
*/
private Path createMobStoreFile(Configuration conf, HColumnDescriptor hcd)
throws IOException {
// Setting up a Store
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
htd.addFamily(hcd);
HMobStore mobStore = (HMobStore) region.getStore(hcd.getName());
KeyValue key1 = new KeyValue(ROW, hcd.getName(), QF1, 1, VALUE);
KeyValue key2 = new KeyValue(ROW, hcd.getName(), QF2, 1, VALUE);
KeyValue key3 = new KeyValue(ROW2, hcd.getName(), QF3, 1, VALUE2);
KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
int maxKeyCount = keys.length;
HRegionInfo regionInfo = new HRegionInfo();
StoreFile.Writer mobWriter = mobStore.createWriterInTmp(currentDate,
maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey());
Path mobFilePath = mobWriter.getPath();
String fileName = mobFilePath.getName();
mobWriter.append(key1);
mobWriter.append(key2);
mobWriter.append(key3);
mobWriter.close();
String targetPathName = MobUtils.formatDate(currentDate);
Path targetPath = new Path(mobStore.getPath(), targetPathName);
mobStore.commitFile(mobFilePath, targetPath);
return new Path(targetPath, fileName);
}
@Test
public void testMobFileCache() throws Exception {
FileSystem fs = FileSystem.get(conf);
conf.set(MobConstants.MOB_FILE_CACHE_SIZE_KEY, TEST_CACHE_SIZE);
mobFileCache = new MobFileCache(conf);
Path file1Path = createMobStoreFile(FAMILY1);
Path file2Path = createMobStoreFile(FAMILY2);
Path file3Path = createMobStoreFile(FAMILY3);
// Before open one file by the MobFileCache
assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize());
// Open one file by the MobFileCache
CachedMobFile cachedMobFile1 = (CachedMobFile) mobFileCache.openFile(
fs, file1Path, mobCacheConf);
assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
assertNotNull(cachedMobFile1);
assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
// The evict is also managed by a schedule thread pool.
// And its check period is set as 3600 seconds by default.
// This evict should get the lock at the most time
mobFileCache.evict(); // Cache not full, evict it
assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
mobFileCache.evictFile(file1Path.getName()); // Evict one file
assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize());
assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile1.getReferenceCount());
cachedMobFile1.close(); // Close the cached mob file
// Reopen three cached file
cachedMobFile1 = (CachedMobFile) mobFileCache.openFile(
fs, file1Path, mobCacheConf);
assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
CachedMobFile cachedMobFile2 = (CachedMobFile) mobFileCache.openFile(
fs, file2Path, mobCacheConf);
assertEquals(EXPECTED_CACHE_SIZE_TWO, mobFileCache.getCacheSize());
CachedMobFile cachedMobFile3 = (CachedMobFile) mobFileCache.openFile(
fs, file3Path, mobCacheConf);
// Before the evict
// Evict the cache, should clost the first file 1
assertEquals(EXPECTED_CACHE_SIZE_THREE, mobFileCache.getCacheSize());
assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile2.getReferenceCount());
assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile3.getReferenceCount());
mobFileCache.evict();
assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile1.getReferenceCount());
assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile2.getReferenceCount());
assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile3.getReferenceCount());
}
}

View File

@ -0,0 +1,79 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
import junit.framework.TestCase;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestMobFileName extends TestCase {
private String uuid;
private Date date;
private String dateStr;
private byte[] startKey;
public void setUp() {
Random random = new Random();
uuid = UUID.randomUUID().toString().replaceAll("-", "");
date = new Date();
dateStr = MobUtils.formatDate(date);
startKey = Bytes.toBytes(random.nextInt());
}
@Test
public void testHashCode() {
assertEquals(MobFileName.create(startKey, dateStr, uuid).hashCode(),
MobFileName.create(startKey, dateStr, uuid).hashCode());
assertNotSame(MobFileName.create(startKey, dateStr, uuid).hashCode(),
MobFileName.create(startKey, dateStr, uuid).hashCode());
}
@Test
public void testCreate() {
MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid);
assertEquals(mobFileName, MobFileName.create(mobFileName.getFileName()));
}
@Test
public void testGet() {
MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid);
assertEquals(MD5Hash.getMD5AsHex(startKey, 0, startKey.length), mobFileName.getStartKey());
assertEquals(dateStr, mobFileName.getDate());
assertEquals(mobFileName.getFileName(), MD5Hash.getMD5AsHex(startKey, 0, startKey.length)
+ dateStr + uuid);
}
@Test
public void testEquals() {
MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid);
assertTrue(mobFileName.equals(mobFileName));
assertFalse(mobFileName.equals(this));
assertTrue(mobFileName.equals(MobFileName.create(startKey, dateStr, uuid)));
}
}

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestDeleteMobTable {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte[] FAMILY = Bytes.toBytes("family");
private final static byte[] QF = Bytes.toBytes("qualifier");
private static Random random = new Random();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Generate the mob value.
*
* @param size
* the size of the value
* @return the mob value generated
*/
private static byte[] generateMobValue(int size) {
byte[] mobVal = new byte[size];
random.nextBytes(mobVal);
return mobVal;
}
@Test
public void testDeleteMobTable() throws Exception {
byte[] tableName = Bytes.toBytes("testDeleteMobTable");
TableName tn = TableName.valueOf(tableName);
HTableDescriptor htd = new HTableDescriptor(tn);
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
htd.addFamily(hcd);
HBaseAdmin admin = null;
HTable table = null;
try {
admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
admin.createTable(htd);
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
byte[] value = generateMobValue(10);
byte[] row = Bytes.toBytes("row");
Put put = new Put(row);
put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
table.put(put);
table.flushCommits();
admin.flush(tableName);
// the mob file exists
Assert.assertEquals(1, countMobFiles(tn, hcd.getNameAsString()));
Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
String fileName = assertHasOneMobRow(table, tn, hcd.getNameAsString());
Assert.assertFalse(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
Assert.assertTrue(mobTableDirExist(tn));
table.close();
admin.disableTable(tn);
admin.deleteTable(tn);
Assert.assertFalse(admin.tableExists(tn));
Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
Assert.assertEquals(1, countArchiveMobFiles(tn, hcd.getNameAsString()));
Assert.assertTrue(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
Assert.assertFalse(mobTableDirExist(tn));
} finally {
if (admin != null) {
admin.close();
}
}
}
@Test
public void testDeleteNonMobTable() throws Exception {
byte[] tableName = Bytes.toBytes("testDeleteNonMobTable");
TableName tn = TableName.valueOf(tableName);
HTableDescriptor htd = new HTableDescriptor(tn);
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
htd.addFamily(hcd);
HBaseAdmin admin = null;
HTable table = null;
try {
admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
admin.createTable(htd);
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
byte[] value = generateMobValue(10);
byte[] row = Bytes.toBytes("row");
Put put = new Put(row);
put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
table.put(put);
table.flushCommits();
admin.flush(tableName);
table.close();
// the mob file doesn't exist
Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
Assert.assertFalse(mobTableDirExist(tn));
admin.disableTable(tn);
admin.deleteTable(tn);
Assert.assertFalse(admin.tableExists(tn));
Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
Assert.assertFalse(mobTableDirExist(tn));
} finally {
if (admin != null) {
admin.close();
}
}
}
private int countMobFiles(TableName tn, String familyName) throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path mobFileDir = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName);
if (fs.exists(mobFileDir)) {
return fs.listStatus(mobFileDir).length;
} else {
return 0;
}
}
private int countArchiveMobFiles(TableName tn, String familyName)
throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
if (fs.exists(storePath)) {
return fs.listStatus(storePath).length;
} else {
return 0;
}
}
private boolean mobTableDirExist(TableName tn) throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path tableDir = FSUtils.getTableDir(MobUtils.getMobHome(TEST_UTIL.getConfiguration()), tn);
return fs.exists(tableDir);
}
private boolean mobArchiveExist(TableName tn, String familyName, String fileName)
throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
return fs.exists(new Path(storePath, fileName));
}
private String assertHasOneMobRow(HTable table, TableName tn, String familyName)
throws IOException {
Scan scan = new Scan();
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
ResultScanner rs = table.getScanner(scan);
Result r = rs.next();
Assert.assertNotNull(r);
byte[] value = r.getValue(FAMILY, QF);
String fileName = Bytes.toString(value, Bytes.SIZEOF_LONG, value.length - Bytes.SIZEOF_LONG);
Path filePath = new Path(
MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName), fileName);
FileSystem fs = TEST_UTIL.getTestFileSystem();
Assert.assertTrue(fs.exists(filePath));
r = rs.next();
Assert.assertNull(r);
return fileName;
}
}

View File

@ -0,0 +1,471 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
@Category(MediumTests.class)
public class TestHMobStore {
public static final Log LOG = LogFactory.getLog(TestHMobStore.class);
@Rule public TestName name = new TestName();
private HMobStore store;
private HRegion region;
private HColumnDescriptor hcd;
private FileSystem fs;
private byte [] table = Bytes.toBytes("table");
private byte [] family = Bytes.toBytes("family");
private byte [] row = Bytes.toBytes("row");
private byte [] row2 = Bytes.toBytes("row2");
private byte [] qf1 = Bytes.toBytes("qf1");
private byte [] qf2 = Bytes.toBytes("qf2");
private byte [] qf3 = Bytes.toBytes("qf3");
private byte [] qf4 = Bytes.toBytes("qf4");
private byte [] qf5 = Bytes.toBytes("qf5");
private byte [] qf6 = Bytes.toBytes("qf6");
private byte[] value = Bytes.toBytes("value");
private byte[] value2 = Bytes.toBytes("value2");
private Path mobFilePath;
private Date currentDate = new Date();
private KeyValue seekKey1;
private KeyValue seekKey2;
private KeyValue seekKey3;
private NavigableSet<byte[]> qualifiers =
new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
private List<Cell> expected = new ArrayList<Cell>();
private long id = System.currentTimeMillis();
private Get get = new Get(row);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final String DIR = TEST_UTIL.getDataTestDir("TestHMobStore").toString();
/**
* Setup
* @throws Exception
*/
@Before
public void setUp() throws Exception {
qualifiers.add(qf1);
qualifiers.add(qf3);
qualifiers.add(qf5);
Iterator<byte[]> iter = qualifiers.iterator();
while(iter.hasNext()){
byte [] next = iter.next();
expected.add(new KeyValue(row, family, next, 1, value));
get.addColumn(family, next);
get.setMaxVersions(); // all versions.
}
}
private void init(String methodName, Configuration conf, boolean testStore)
throws IOException {
hcd = new HColumnDescriptor(family);
hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
hcd.setMaxVersions(4);
init(methodName, conf, hcd, testStore);
}
private void init(String methodName, Configuration conf,
HColumnDescriptor hcd, boolean testStore) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
init(methodName, conf, htd, hcd, testStore);
}
private void init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd, boolean testStore) throws IOException {
//Setting up tje Region and Store
Path basedir = new Path(DIR+methodName);
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
String logName = "logs";
Path logdir = new Path(basedir, logName);
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
HLog hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
store = new HMobStore(region, hcd, conf);
if(testStore) {
init(conf, hcd);
}
}
private void init(Configuration conf, HColumnDescriptor hcd)
throws IOException {
Path basedir = FSUtils.getRootDir(conf);
fs = FileSystem.get(conf);
Path homePath = new Path(basedir, Bytes.toString(family) + Path.SEPARATOR
+ Bytes.toString(family));
fs.mkdirs(homePath);
KeyValue key1 = new KeyValue(row, family, qf1, 1, value);
KeyValue key2 = new KeyValue(row, family, qf2, 1, value);
KeyValue key3 = new KeyValue(row2, family, qf3, 1, value2);
KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
int maxKeyCount = keys.length;
StoreFile.Writer mobWriter = store.createWriterInTmp(currentDate,
maxKeyCount, hcd.getCompactionCompression(), region.getStartKey());
mobFilePath = mobWriter.getPath();
mobWriter.append(key1);
mobWriter.append(key2);
mobWriter.append(key3);
mobWriter.close();
long valueLength1 = key1.getValueLength();
long valueLength2 = key2.getValueLength();
long valueLength3 = key3.getValueLength();
String targetPathName = MobUtils.formatDate(currentDate);
byte[] referenceValue =
Bytes.toBytes(targetPathName + Path.SEPARATOR
+ mobFilePath.getName());
byte[] newReferenceValue1 = Bytes.add(Bytes.toBytes(valueLength1), referenceValue);
byte[] newReferenceValue2 = Bytes.add(Bytes.toBytes(valueLength2), referenceValue);
byte[] newReferenceValue3 = Bytes.add(Bytes.toBytes(valueLength3), referenceValue);
seekKey1 = new KeyValue(row, family, qf1, Long.MAX_VALUE, newReferenceValue1);
seekKey2 = new KeyValue(row, family, qf2, Long.MAX_VALUE, newReferenceValue2);
seekKey3 = new KeyValue(row2, family, qf3, Long.MAX_VALUE, newReferenceValue3);
}
/**
* Getting data from memstore
* @throws IOException
*/
@Test
public void testGetFromMemStore() throws IOException {
final Configuration conf = HBaseConfiguration.create();
init(name.getMethodName(), conf, false);
//Put data in memstore
this.store.add(new KeyValue(row, family, qf1, 1, value));
this.store.add(new KeyValue(row, family, qf2, 1, value));
this.store.add(new KeyValue(row, family, qf3, 1, value));
this.store.add(new KeyValue(row, family, qf4, 1, value));
this.store.add(new KeyValue(row, family, qf5, 1, value));
this.store.add(new KeyValue(row, family, qf6, 1, value));
Scan scan = new Scan(get);
InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getFamily().getName()),
0);
List<Cell> results = new ArrayList<Cell>();
scanner.next(results);
Collections.sort(results, KeyValue.COMPARATOR);
scanner.close();
//Compare
Assert.assertEquals(expected.size(), results.size());
for(int i=0; i<results.size(); i++) {
// Verify the values
Assert.assertEquals(expected.get(i), results.get(i));
}
}
/**
* Getting MOB data from files
* @throws IOException
*/
@Test
public void testGetFromFiles() throws IOException {
final Configuration conf = TEST_UTIL.getConfiguration();
init(name.getMethodName(), conf, false);
//Put data in memstore
this.store.add(new KeyValue(row, family, qf1, 1, value));
this.store.add(new KeyValue(row, family, qf2, 1, value));
//flush
flush(1);
//Add more data
this.store.add(new KeyValue(row, family, qf3, 1, value));
this.store.add(new KeyValue(row, family, qf4, 1, value));
//flush
flush(2);
//Add more data
this.store.add(new KeyValue(row, family, qf5, 1, value));
this.store.add(new KeyValue(row, family, qf6, 1, value));
//flush
flush(3);
Scan scan = new Scan(get);
InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getFamily().getName()),
0);
List<Cell> results = new ArrayList<Cell>();
scanner.next(results);
Collections.sort(results, KeyValue.COMPARATOR);
scanner.close();
//Compare
Assert.assertEquals(expected.size(), results.size());
for(int i=0; i<results.size(); i++) {
Assert.assertEquals(expected.get(i), results.get(i));
}
}
/**
* Getting the reference data from files
* @throws IOException
*/
@Test
public void testGetReferencesFromFiles() throws IOException {
final Configuration conf = HBaseConfiguration.create();
init(name.getMethodName(), conf, false);
//Put data in memstore
this.store.add(new KeyValue(row, family, qf1, 1, value));
this.store.add(new KeyValue(row, family, qf2, 1, value));
//flush
flush(1);
//Add more data
this.store.add(new KeyValue(row, family, qf3, 1, value));
this.store.add(new KeyValue(row, family, qf4, 1, value));
//flush
flush(2);
//Add more data
this.store.add(new KeyValue(row, family, qf5, 1, value));
this.store.add(new KeyValue(row, family, qf6, 1, value));
//flush
flush(3);
Scan scan = new Scan(get);
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getFamily().getName()),
0);
List<Cell> results = new ArrayList<Cell>();
scanner.next(results);
Collections.sort(results, KeyValue.COMPARATOR);
scanner.close();
//Compare
Assert.assertEquals(expected.size(), results.size());
for(int i=0; i<results.size(); i++) {
Cell cell = results.get(i);
Assert.assertTrue(MobUtils.isMobReferenceCell(cell));
}
}
/**
* Getting data from memstore and files
* @throws IOException
*/
@Test
public void testGetFromMemStoreAndFiles() throws IOException {
final Configuration conf = HBaseConfiguration.create();
init(name.getMethodName(), conf, false);
//Put data in memstore
this.store.add(new KeyValue(row, family, qf1, 1, value));
this.store.add(new KeyValue(row, family, qf2, 1, value));
//flush
flush(1);
//Add more data
this.store.add(new KeyValue(row, family, qf3, 1, value));
this.store.add(new KeyValue(row, family, qf4, 1, value));
//flush
flush(2);
//Add more data
this.store.add(new KeyValue(row, family, qf5, 1, value));
this.store.add(new KeyValue(row, family, qf6, 1, value));
Scan scan = new Scan(get);
InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getFamily().getName()),
0);
List<Cell> results = new ArrayList<Cell>();
scanner.next(results);
Collections.sort(results, KeyValue.COMPARATOR);
scanner.close();
//Compare
Assert.assertEquals(expected.size(), results.size());
for(int i=0; i<results.size(); i++) {
Assert.assertEquals(expected.get(i), results.get(i));
}
}
/**
* Getting data from memstore and files
* @throws IOException
*/
@Test
public void testMobCellSizeThreshold() throws IOException {
final Configuration conf = HBaseConfiguration.create();
HColumnDescriptor hcd;
hcd = new HColumnDescriptor(family);
hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(100L));
hcd.setMaxVersions(4);
init(name.getMethodName(), conf, hcd, false);
//Put data in memstore
this.store.add(new KeyValue(row, family, qf1, 1, value));
this.store.add(new KeyValue(row, family, qf2, 1, value));
//flush
flush(1);
//Add more data
this.store.add(new KeyValue(row, family, qf3, 1, value));
this.store.add(new KeyValue(row, family, qf4, 1, value));
//flush
flush(2);
//Add more data
this.store.add(new KeyValue(row, family, qf5, 1, value));
this.store.add(new KeyValue(row, family, qf6, 1, value));
//flush
flush(3);
Scan scan = new Scan(get);
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getFamily().getName()),
0);
List<Cell> results = new ArrayList<Cell>();
scanner.next(results);
Collections.sort(results, KeyValue.COMPARATOR);
scanner.close();
//Compare
Assert.assertEquals(expected.size(), results.size());
for(int i=0; i<results.size(); i++) {
Cell cell = results.get(i);
//this is not mob reference cell.
Assert.assertFalse(MobUtils.isMobReferenceCell(cell));
Assert.assertEquals(expected.get(i), results.get(i));
Assert.assertEquals(100, MobUtils.getMobThreshold(store.getFamily()));
}
}
@Test
public void testCommitFile() throws Exception {
final Configuration conf = HBaseConfiguration.create();
init(name.getMethodName(), conf, true);
String targetPathName = MobUtils.formatDate(new Date());
Path targetPath = new Path(store.getPath(), (targetPathName
+ Path.SEPARATOR + mobFilePath.getName()));
fs.delete(targetPath, true);
Assert.assertFalse(fs.exists(targetPath));
//commit file
store.commitFile(mobFilePath, targetPath);
Assert.assertTrue(fs.exists(targetPath));
}
@Test
public void testResolve() throws Exception {
final Configuration conf = HBaseConfiguration.create();
init(name.getMethodName(), conf, true);
String targetPathName = MobUtils.formatDate(currentDate);
Path targetPath = new Path(store.getPath(), targetPathName);
store.commitFile(mobFilePath, targetPath);
//resolve
Cell resultCell1 = store.resolve(seekKey1, false);
Cell resultCell2 = store.resolve(seekKey2, false);
Cell resultCell3 = store.resolve(seekKey3, false);
//compare
Assert.assertEquals(Bytes.toString(value),
Bytes.toString(CellUtil.cloneValue(resultCell1)));
Assert.assertEquals(Bytes.toString(value),
Bytes.toString(CellUtil.cloneValue(resultCell2)));
Assert.assertEquals(Bytes.toString(value2),
Bytes.toString(CellUtil.cloneValue(resultCell3)));
}
/**
* Flush the memstore
* @param storeFilesSize
* @throws IOException
*/
private void flush(int storeFilesSize) throws IOException{
this.store.snapshot();
flushStore(store, id++);
Assert.assertEquals(storeFilesSize, this.store.getStorefiles().size());
Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).kvset.size());
}
/**
* Flush the memstore
* @param store
* @param id
* @throws IOException
*/
private static void flushStore(HMobStore store, long id) throws IOException {
StoreFlushContext storeFlushCtx = store.createFlushContext(id);
storeFlushCtx.prepare();
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
}
}

View File

@ -0,0 +1,318 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestMobStoreScanner {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] row1 = Bytes.toBytes("row1");
private final static byte [] family = Bytes.toBytes("family");
private final static byte [] qf1 = Bytes.toBytes("qualifier1");
private final static byte [] qf2 = Bytes.toBytes("qualifier2");
protected final byte[] qf3 = Bytes.toBytes("qualifier3");
private static HTable table;
private static HBaseAdmin admin;
private static HColumnDescriptor hcd;
private static HTableDescriptor desc;
private static Random random = new Random();
private static long defaultThreshold = 10;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
public void setUp(long threshold, String TN) throws Exception {
desc = new HTableDescriptor(TableName.valueOf(TN));
hcd = new HColumnDescriptor(family);
hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(threshold));
hcd.setMaxVersions(4);
desc.addFamily(hcd);
admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
admin.createTable(desc);
table = new HTable(TEST_UTIL.getConfiguration(), TN);
}
/**
* Generate the mob value.
*
* @param size the size of the value
* @return the mob value generated
*/
private static byte[] generateMobValue(int size) {
byte[] mobVal = new byte[size];
random.nextBytes(mobVal);
return mobVal;
}
/**
* Set the scan attribute
*
* @param reversed if true, scan will be backward order
* @param mobScanRaw if true, scan will get the mob reference
* @return this
*/
public void setScan(Scan scan, boolean reversed, boolean mobScanRaw) {
scan.setReversed(reversed);
scan.setMaxVersions(4);
if(mobScanRaw) {
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
}
}
@Test
public void testMobStoreScanner() throws Exception {
testGetFromFiles(false);
testGetFromMemStore(false);
testGetReferences(false);
testMobThreshold(false);
}
@Test
public void testReversedMobStoreScanner() throws Exception {
testGetFromFiles(true);
testGetFromMemStore(true);
testGetReferences(true);
testMobThreshold(true);
}
public void testGetFromFiles(boolean reversed) throws Exception {
String TN = "testGetFromFiles" + reversed;
setUp(defaultThreshold, TN);
long ts1 = System.currentTimeMillis();
long ts2 = ts1 + 1;
long ts3 = ts1 + 2;
byte [] value = generateMobValue((int)defaultThreshold+1);
Put put1 = new Put(row1);
put1.add(family, qf1, ts3, value);
put1.add(family, qf2, ts2, value);
put1.add(family, qf3, ts1, value);
table.put(put1);
table.flushCommits();
admin.flush(TN);
Scan scan = new Scan();
setScan(scan, reversed, false);
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
List<Cell> cells = res.listCells();
for(Cell cell : cells) {
// Verify the value
Assert.assertEquals(Bytes.toString(value),
Bytes.toString(CellUtil.cloneValue(cell)));
count++;
}
}
results.close();
Assert.assertEquals(3, count);
}
public void testGetFromMemStore(boolean reversed) throws Exception {
String TN = "testGetFromMemStore" + reversed;
setUp(defaultThreshold, TN);
long ts1 = System.currentTimeMillis();
long ts2 = ts1 + 1;
long ts3 = ts1 + 2;
byte [] value = generateMobValue((int)defaultThreshold+1);;
Put put1 = new Put(row1);
put1.add(family, qf1, ts3, value);
put1.add(family, qf2, ts2, value);
put1.add(family, qf3, ts1, value);
table.put(put1);
Scan scan = new Scan();
setScan(scan, reversed, false);
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
List<Cell> cells = res.listCells();
for(Cell cell : cells) {
// Verify the value
Assert.assertEquals(Bytes.toString(value),
Bytes.toString(CellUtil.cloneValue(cell)));
count++;
}
}
results.close();
Assert.assertEquals(3, count);
}
public void testGetReferences(boolean reversed) throws Exception {
String TN = "testGetReferences" + reversed;
setUp(defaultThreshold, TN);
long ts1 = System.currentTimeMillis();
long ts2 = ts1 + 1;
long ts3 = ts1 + 2;
byte [] value = generateMobValue((int)defaultThreshold+1);;
Put put1 = new Put(row1);
put1.add(family, qf1, ts3, value);
put1.add(family, qf2, ts2, value);
put1.add(family, qf3, ts1, value);
table.put(put1);
table.flushCommits();
admin.flush(TN);
Scan scan = new Scan();
setScan(scan, reversed, true);
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
List<Cell> cells = res.listCells();
for(Cell cell : cells) {
// Verify the value
assertIsMobReference(cell, row1, family, value, TN);
count++;
}
}
results.close();
Assert.assertEquals(3, count);
}
public void testMobThreshold(boolean reversed) throws Exception {
String TN = "testMobThreshold" + reversed;
setUp(defaultThreshold, TN);
byte [] valueLess = generateMobValue((int)defaultThreshold-1);
byte [] valueEqual = generateMobValue((int)defaultThreshold);
byte [] valueGreater = generateMobValue((int)defaultThreshold+1);
long ts1 = System.currentTimeMillis();
long ts2 = ts1 + 1;
long ts3 = ts1 + 2;
Put put1 = new Put(row1);
put1.add(family, qf1, ts3, valueLess);
put1.add(family, qf2, ts2, valueEqual);
put1.add(family, qf3, ts1, valueGreater);
table.put(put1);
table.flushCommits();
admin.flush(TN);
Scan scan = new Scan();
setScan(scan, reversed, true);
Cell cellLess= null;
Cell cellEqual = null;
Cell cellGreater = null;
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
List<Cell> cells = res.listCells();
for(Cell cell : cells) {
// Verify the value
String qf = Bytes.toString(CellUtil.cloneQualifier(cell));
if(qf.equals(Bytes.toString(qf1))) {
cellLess = cell;
}
if(qf.equals(Bytes.toString(qf2))) {
cellEqual = cell;
}
if(qf.equals(Bytes.toString(qf3))) {
cellGreater = cell;
}
count++;
}
}
Assert.assertEquals(3, count);
assertNotMobReference(cellLess, row1, family, valueLess);
assertNotMobReference(cellEqual, row1, family, valueEqual);
assertIsMobReference(cellGreater, row1, family, valueGreater, TN);
results.close();
}
/**
* Assert the value is not store in mob.
*/
private static void assertNotMobReference(Cell cell, byte[] row, byte[] family,
byte[] value) throws IOException {
Assert.assertEquals(Bytes.toString(row),
Bytes.toString(CellUtil.cloneRow(cell)));
Assert.assertEquals(Bytes.toString(family),
Bytes.toString(CellUtil.cloneFamily(cell)));
Assert.assertTrue(Bytes.toString(value).equals(
Bytes.toString(CellUtil.cloneValue(cell))));
}
/**
* Assert the value is store in mob.
*/
private static void assertIsMobReference(Cell cell, byte[] row, byte[] family,
byte[] value, String TN) throws IOException {
Assert.assertEquals(Bytes.toString(row),
Bytes.toString(CellUtil.cloneRow(cell)));
Assert.assertEquals(Bytes.toString(family),
Bytes.toString(CellUtil.cloneFamily(cell)));
Assert.assertFalse(Bytes.toString(value).equals(
Bytes.toString(CellUtil.cloneValue(cell))));
byte[] referenceValue = CellUtil.cloneValue(cell);
String fileName = Bytes.toString(referenceValue, 8, referenceValue.length-8);
Path mobFamilyPath;
mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
TableName.valueOf(TN)), hcd.getNameAsString());
Path targetPath = new Path(mobFamilyPath, fileName);
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
Assert.assertTrue(fs.exists(targetPath));
}
}