From 5c14f749b3196fe5d9e1efe6dd5d6d7356cc72d0 Mon Sep 17 00:00:00 2001 From: Jonathan M Hsieh Date: Thu, 4 Sep 2014 05:41:21 -0700 Subject: [PATCH] HBASE-11643 Read and write MOB in HBase (Jingcheng Du) --- .../java/org/apache/hadoop/hbase/TagType.java | 4 + .../src/main/resources/hbase-default.xml | 29 ++ .../master/handler/DeleteTableHandler.java | 30 ++ .../hadoop/hbase/mob/CachedMobFile.java | 114 +++++ .../hbase/mob/DefaultMobStoreFlusher.java | 217 ++++++++ .../hadoop/hbase/mob/MobCacheConfig.java | 56 +++ .../apache/hadoop/hbase/mob/MobConstants.java | 61 +++ .../org/apache/hadoop/hbase/mob/MobFile.java | 140 ++++++ .../apache/hadoop/hbase/mob/MobFileCache.java | 270 ++++++++++ .../apache/hadoop/hbase/mob/MobFileName.java | 169 +++++++ .../hadoop/hbase/mob/MobStoreEngine.java | 40 ++ .../org/apache/hadoop/hbase/mob/MobUtils.java | 263 ++++++++++ .../regionserver/DefaultStoreEngine.java | 16 +- .../hadoop/hbase/regionserver/HMobStore.java | 268 ++++++++++ .../hadoop/hbase/regionserver/HRegion.java | 4 + .../hadoop/hbase/regionserver/HStore.java | 48 +- .../hbase/regionserver/MobStoreScanner.java | 69 +++ .../regionserver/ReversedMobStoreScanner.java | 69 +++ .../hadoop/hbase/regionserver/StoreFile.java | 2 +- .../apache/hadoop/hbase/mob/MobTestUtil.java | 86 ++++ .../hadoop/hbase/mob/TestCachedMobFile.java | 154 ++++++ .../hbase/mob/TestDefaultMobStoreFlusher.java | 193 +++++++ .../hbase/mob/TestMobDataBlockEncoding.java | 141 ++++++ .../apache/hadoop/hbase/mob/TestMobFile.java | 124 +++++ .../hadoop/hbase/mob/TestMobFileCache.java | 206 ++++++++ .../hadoop/hbase/mob/TestMobFileName.java | 79 +++ .../regionserver/TestDeleteMobTable.java | 225 +++++++++ .../hbase/regionserver/TestHMobStore.java | 471 ++++++++++++++++++ .../regionserver/TestMobStoreScanner.java | 318 ++++++++++++ 29 files changed, 3853 insertions(+), 13 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java index a21f7de1470..8613a3534d3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -28,4 +28,8 @@ public final class TagType { public static final byte VISIBILITY_TAG_TYPE = (byte) 2; public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4; + + // mob tags + public static final byte MOB_REFERENCE_TAG_TYPE = (byte) 5; + public static final byte MOB_TABLE_NAME_TAG_TYPE = (byte) 6; } diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index c724c27cbf9..872bbc5f07b 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1454,4 +1454,33 @@ possible configurations would overwhelm and obscure the important. hbase.http.staticuser.user dr.stack + + + hbase.mob.file.cache.size + 1000 + + Number of opened file handlers to cache. + A larger value will benefit reads by providing more file handlers per mob + file cache and would reduce frequent file opening and closing. + However, if this is set too high, this could lead to a "too many opened file handlers" + The default value is 1000. + + + + hbase.mob.cache.evict.period + 3600 + + The amount of time in seconds before the mob cache evicts cached mob files. + The default value is 3600 seconds. + + + + hbase.mob.cache.evict.remain.ratio + 0.5f + + The ratio (between 0.0 and 1.0) of files that remains cached after an eviction + is triggered when the number of cached mob files exceeds the hbase.mob.file.cache.size. + The default value is 0.5f. + + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java index 730da734e92..b5cdae9e725 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CoordinatedStateException; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; @@ -43,6 +45,8 @@ import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; @InterfaceAudience.Private public class DeleteTableHandler extends TableEventHandler { @@ -152,10 +156,36 @@ public class DeleteTableHandler extends TableEventHandler { tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName())); } + // Archive the mob data if there is a mob-enabled column + HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies(); + boolean hasMob = false; + for (HColumnDescriptor hcd : hcds) { + if (MobUtils.isMobFamily(hcd)) { + hasMob = true; + break; + } + } + Path mobTableDir = null; + if (hasMob) { + // Archive mob data + mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), + tableName); + Path regionDir = + new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName()); + if (fs.exists(regionDir)) { + HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir); + } + } // 4. Delete table directory from FS (temp directory) if (!fs.delete(tempTableDir, true)) { LOG.error("Couldn't delete " + tempTableDir); } + // Delete the table directory where the mob files are saved + if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) { + if (!fs.delete(mobTableDir, true)) { + LOG.error("Couldn't delete " + mobTableDir); + } + } LOG.debug("Table '" + tableName + "' archived!"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java new file mode 100644 index 00000000000..457eb6c50ed --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.StoreFile; + +/** + * Cached mob file. + */ +@InterfaceAudience.Private +public class CachedMobFile extends MobFile implements Comparable { + + private long accessCount; + private AtomicLong referenceCount = new AtomicLong(0); + + public CachedMobFile(StoreFile sf) { + super(sf); + } + + public static CachedMobFile create(FileSystem fs, Path path, Configuration conf, + CacheConfig cacheConf) throws IOException { + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + return new CachedMobFile(sf); + } + + public void access(long accessCount) { + this.accessCount = accessCount; + } + + public int compareTo(CachedMobFile that) { + if (this.accessCount == that.accessCount) return 0; + return this.accessCount < that.accessCount ? 1 : -1; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof CachedMobFile)) { + return false; + } + return compareTo((CachedMobFile) obj) == 0; + } + + @Override + public int hashCode() { + return (int)(accessCount ^ (accessCount >>> 32)); + } + + /** + * Opens the mob file if it's not opened yet and increases the reference. + * It's not thread-safe. Use MobFileCache.openFile() instead. + * The reader of the mob file is just opened when it's not opened no matter how many times + * this open() method is invoked. + * The reference is a counter that how many times this reader is referenced. When the + * reference is 0, this reader is closed. + */ + @Override + public void open() throws IOException { + super.open(); + referenceCount.incrementAndGet(); + } + + /** + * Decreases the reference of the underlying reader for the mob file. + * It's not thread-safe. Use MobFileCache.closeFile() instead. + * This underlying reader isn't closed until the reference is 0. + */ + @Override + public void close() throws IOException { + long refs = referenceCount.decrementAndGet(); + if (refs == 0) { + super.close(); + } + } + + /** + * Gets the reference of the current mob file. + * Internal usage, currently it's for testing. + * @return The reference of the current mob file. + */ + public long getReferenceCount() { + return this.referenceCount.longValue(); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java new file mode 100644 index 00000000000..4a036a70e1f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -0,0 +1,217 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; +import org.apache.hadoop.hbase.regionserver.HMobStore; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. + * If the store is not a mob store, the flusher flushes the MemStore the same with + * DefaultStoreFlusher, + * If the store is a mob store, the flusher flushes the MemStore into two places. + * One is the store files of HBase, the other is the mob files. + *
    + *
  1. Cells that are not PUT type or have the delete mark will be directly flushed to HBase.
  2. + *
  3. If the size of a cell value is larger than a threshold, it'll be flushed + * to a mob file, another cell with the path of this file will be flushed to HBase.
  4. + *
  5. If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to + * HBase directly.
  6. + *
+ * + */ +@InterfaceAudience.Private +public class DefaultMobStoreFlusher extends DefaultStoreFlusher { + + private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class); + private final Object flushLock = new Object(); + private long mobCellValueSizeThreshold = 0; + private Path targetPath; + private HMobStore mobStore; + + public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException { + super(conf, store); + mobCellValueSizeThreshold = MobUtils.getMobThreshold(store.getFamily()); + this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(), + store.getColumnFamilyName()); + if (!this.store.getFileSystem().exists(targetPath)) { + this.store.getFileSystem().mkdirs(targetPath); + } + this.mobStore = (HMobStore) store; + } + + /** + * Flushes the snapshot of the MemStore. + * If this store is not a mob store, flush the cells in the snapshot to store files of HBase. + * If the store is a mob one, the flusher flushes the MemStore into two places. + * One is the store files of HBase, the other is the mob files. + *
    + *
  1. Cells that are not PUT type or have the delete mark will be directly flushed to + * HBase.
  2. + *
  3. If the size of a cell value is larger than a threshold, it'll be + * flushed to a mob file, another cell with the path of this file will be flushed to HBase.
  4. + *
  5. If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to + * HBase directly.
  6. + *
+ */ + @Override + public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, + MonitoredTask status) throws IOException { + ArrayList result = new ArrayList(); + int cellsCount = snapshot.getCellsCount(); + if (cellsCount == 0) return result; // don't flush if there are no entries + + // Use a store scanner to find which rows to flush. + long smallestReadPoint = store.getSmallestReadPoint(); + InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + if (scanner == null) { + return result; // NULL scanner returned from coprocessor hooks means skip normal processing + } + StoreFile.Writer writer; + try { + // TODO: We can fail in the below block before we complete adding this flush to + // list of store files. Add cleanup of anything put on filesystem if we fail. + synchronized (flushLock) { + status.setStatus("Flushing " + store + ": creating writer"); + // Write the map out to the disk + writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(), + false, true, true); + writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); + try { + // It's a mob store, flush the cells in a mob way. This is the difference of flushing + // between a normal and a mob store. + performMobFlush(snapshot, cacheFlushId, scanner, writer, status); + } finally { + finalizeWriter(writer, cacheFlushId, status); + } + } + } finally { + scanner.close(); + } + LOG.info("Flushed, sequenceid=" + cacheFlushId + ", memsize=" + + snapshot.getSize() + ", hasBloomFilter=" + writer.hasGeneralBloom() + + ", into tmp file " + writer.getPath()); + result.add(writer.getPath()); + return result; + } + + /** + * Flushes the cells in the mob store. + *
    In the mob store, the cells with PUT type might have or have no mob tags. + *
  1. If a cell does not have a mob tag, flushing the cell to different files depends + * on the value length. If the length is larger than a threshold, it's flushed to a + * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly + * flush the cell to a store file in HBase.
  2. + *
  3. If a cell have a mob tag, its value is a mob file name, directly flush it + * to a store file in HBase.
  4. + *
+ * @param snapshot Memstore snapshot. + * @param cacheFlushId Log cache flush sequence number. + * @param scanner The scanner of memstore snapshot. + * @param writer The store file writer. + * @param status Task that represents the flush operation and may be updated with status. + * @throws IOException + */ + protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, + InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException { + StoreFile.Writer mobFileWriter = null; + int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + long mobKVCount = 0; + long time = snapshot.getTimeRangeTracker().getMaximumTimestamp(); + mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + // the target path is {tableName}/.mob/{cfName}/mobFiles + // the relative path is mobFiles + byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); + try { + Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName() + .getName()); + List cells = new ArrayList(); + boolean hasMore; + do { + hasMore = scanner.next(cells, compactionKVMax); + if (!cells.isEmpty()) { + for (Cell c : cells) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to + // disk. + KeyValue kv = KeyValueUtil.ensureKeyValue(c); + if (kv.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(kv) + || kv.getTypeByte() != KeyValue.Type.Put.getCode()) { + writer.append(kv); + } else { + // append the original keyValue in the mob file. + mobFileWriter.append(kv); + mobKVCount++; + + // append the tags to the KeyValue. + // The key is same, the value is the filename of the mob file + KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag); + writer.append(reference); + } + } + cells.clear(); + } + } while (hasMore); + } finally { + status.setStatus("Flushing mob file " + store + ": appending metadata"); + mobFileWriter.appendMetadata(cacheFlushId, false); + status.setStatus("Flushing mob file " + store + ": closing flushed file"); + mobFileWriter.close(); + } + + if (mobKVCount > 0) { + // commit the mob file from temp folder to target folder. + // If the mob file is committed successfully but the store file is not, + // the committed mob file will be handled by the sweep tool as an unused + // file. + mobStore.commitFile(mobFileWriter.getPath(), targetPath); + } else { + try { + // If the mob file is empty, delete it instead of committing. + store.getFileSystem().delete(mobFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Fail to delete the temp mob file", e); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java new file mode 100644 index 00000000000..b16001085f8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java @@ -0,0 +1,56 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; + +/** + * The cache configuration for the mob. + */ +@InterfaceAudience.Private +public class MobCacheConfig extends CacheConfig { + + private static MobFileCache mobFileCache; + + public MobCacheConfig(Configuration conf, HColumnDescriptor family) { + super(conf, family); + instantiateMobFileCache(conf); + } + + /** + * Instantiates the MobFileCache. + * @param conf The current configuration. + */ + public static synchronized void instantiateMobFileCache(Configuration conf) { + if (mobFileCache == null) { + mobFileCache = new MobFileCache(conf); + } + } + + /** + * Gets the MobFileCache. + * @return The MobFileCache. + */ + public MobFileCache getMobFileCache() { + return mobFileCache; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java new file mode 100644 index 00000000000..d2087224cc7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java @@ -0,0 +1,61 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * The constants used in mob. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MobConstants { + + public static final byte[] IS_MOB = Bytes.toBytes("isMob"); + public static final byte[] MOB_THRESHOLD = Bytes.toBytes("mobThreshold"); + public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k + + public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw"; + public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks"; + + public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size"; + public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000; + + public static final String MOB_DIR_NAME = "mobdir"; + public static final String MOB_REGION_NAME = ".mob"; + public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME); + + public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period"; + public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio"; + public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE, + HConstants.EMPTY_BYTE_ARRAY); + + public static final float DEFAULT_EVICT_REMAIN_RATIO = 0.5f; + public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l; + + public final static String TEMP_DIR_NAME = ".tmp"; + private MobConstants() { + + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java new file mode 100644 index 00000000000..a120057c89a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java @@ -0,0 +1,140 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; + +/** + * The mob file. + */ +@InterfaceAudience.Private +public class MobFile { + + private StoreFile sf; + + // internal use only for sub classes + protected MobFile() { + } + + protected MobFile(StoreFile sf) { + this.sf = sf; + } + + /** + * Internal use only. This is used by the sweeper. + * + * @return The store file scanner. + * @throws IOException + */ + public StoreFileScanner getScanner() throws IOException { + List sfs = new ArrayList(); + sfs.add(sf); + List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, + false, null, sf.getMaxMemstoreTS()); + + return sfScanners.get(0); + } + + /** + * Reads a cell from the mob file. + * @param search The cell need to be searched in the mob file. + * @param cacheMobBlocks Should this scanner cache blocks. + * @return The cell in the mob file. + * @throws IOException + */ + public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException { + Cell result = null; + StoreFileScanner scanner = null; + List sfs = new ArrayList(); + sfs.add(sf); + try { + List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, + cacheMobBlocks, true, false, null, sf.getMaxMemstoreTS()); + if (!sfScanners.isEmpty()) { + scanner = sfScanners.get(0); + if (scanner.seek(search)) { + result = scanner.peek(); + } + } + } finally { + if (scanner != null) { + scanner.close(); + } + } + return result; + } + + /** + * Gets the file name. + * @return The file name. + */ + public String getFileName() { + return sf.getPath().getName(); + } + + /** + * Opens the underlying reader. + * It's not thread-safe. Use MobFileCache.openFile() instead. + * @throws IOException + */ + public void open() throws IOException { + if (sf.getReader() == null) { + sf.createReader(); + } + } + + /** + * Closes the underlying reader, but do no evict blocks belonging to this file. + * It's not thread-safe. Use MobFileCache.closeFile() instead. + * @throws IOException + */ + public void close() throws IOException { + if (sf != null) { + sf.closeReader(false); + sf = null; + } + } + + /** + * Creates an instance of the MobFile. + * @param fs The file system. + * @param path The path of the underlying StoreFile. + * @param conf The configuration. + * @param cacheConf The CacheConfig. + * @return An instance of the MobFile. + * @throws IOException + */ + public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf) + throws IOException { + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + return new MobFile(sf); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java new file mode 100644 index 00000000000..97530b1d8d3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java @@ -0,0 +1,270 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.IdLock; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * The cache for mob files. + * This cache doesn't cache the mob file blocks. It only caches the references of mob files. + * We are doing this to avoid opening and closing mob files all the time. We just keep + * references open. + */ +@InterfaceAudience.Private +public class MobFileCache { + + private static final Log LOG = LogFactory.getLog(MobFileCache.class); + + /* + * Eviction and statistics thread. Periodically run to print the statistics and + * evict the lru cached mob files when the count of the cached files is larger + * than the threshold. + */ + static class EvictionThread extends Thread { + MobFileCache lru; + + public EvictionThread(MobFileCache lru) { + super("MobFileCache.EvictionThread"); + setDaemon(true); + this.lru = lru; + } + + @Override + public void run() { + lru.evict(); + } + } + + // a ConcurrentHashMap, accesses to this map are synchronized. + private Map map = null; + // caches access count + private final AtomicLong count; + private long lastAccess; + private final AtomicLong miss; + private long lastMiss; + + // a lock to sync the evict to guarantee the eviction occurs in sequence. + // the method evictFile is not sync by this lock, the ConcurrentHashMap does the sync there. + private final ReentrantLock evictionLock = new ReentrantLock(true); + + //stripes lock on each mob file based on its hash. Sync the openFile/closeFile operations. + private final IdLock keyLock = new IdLock(); + + private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("MobFileCache #%d").setDaemon(true).build()); + private final Configuration conf; + + // the count of the cached references to mob files + private final int mobFileMaxCacheSize; + private final boolean isCacheEnabled; + private float evictRemainRatio; + + public MobFileCache(Configuration conf) { + this.conf = conf; + this.mobFileMaxCacheSize = conf.getInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, + MobConstants.DEFAULT_MOB_FILE_CACHE_SIZE); + isCacheEnabled = (mobFileMaxCacheSize > 0); + map = new ConcurrentHashMap(mobFileMaxCacheSize); + this.count = new AtomicLong(0); + this.miss = new AtomicLong(0); + this.lastAccess = 0; + this.lastMiss = 0; + if (isCacheEnabled) { + long period = conf.getLong(MobConstants.MOB_CACHE_EVICT_PERIOD, + MobConstants.DEFAULT_MOB_CACHE_EVICT_PERIOD); // in seconds + evictRemainRatio = conf.getFloat(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO, + MobConstants.DEFAULT_EVICT_REMAIN_RATIO); + if (evictRemainRatio < 0.0) { + evictRemainRatio = 0.0f; + LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is less than 0.0, 0.0 is used."); + } else if (evictRemainRatio > 1.0) { + evictRemainRatio = 1.0f; + LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is larger than 1.0, 1.0 is used."); + } + this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period, + TimeUnit.SECONDS); + } + LOG.info("MobFileCache is initialized, and the cache size is " + mobFileMaxCacheSize); + } + + /** + * Evicts the lru cached mob files when the count of the cached files is larger + * than the threshold. + */ + public void evict() { + if (isCacheEnabled) { + // Ensure only one eviction at a time + if (!evictionLock.tryLock()) { + return; + } + printStatistics(); + List evictedFiles = new ArrayList(); + try { + if (map.size() <= mobFileMaxCacheSize) { + return; + } + List files = new ArrayList(map.values()); + Collections.sort(files); + int start = (int) (mobFileMaxCacheSize * evictRemainRatio); + if (start >= 0) { + for (int i = start; i < files.size(); i++) { + String name = files.get(i).getFileName(); + CachedMobFile evictedFile = map.remove(name); + if (evictedFile != null) { + evictedFiles.add(evictedFile); + } + } + } + } finally { + evictionLock.unlock(); + } + // EvictionLock is released. Close the evicted files one by one. + // The closes are sync in the closeFile method. + for (CachedMobFile evictedFile : evictedFiles) { + closeFile(evictedFile); + } + } + } + + /** + * Evicts the cached file by the name. + * @param fileName The name of a cached file. + */ + public void evictFile(String fileName) { + if (isCacheEnabled) { + IdLock.Entry lockEntry = null; + try { + // obtains the lock to close the cached file. + lockEntry = keyLock.getLockEntry(fileName.hashCode()); + CachedMobFile evictedFile = map.remove(fileName); + if (evictedFile != null) { + evictedFile.close(); + } + } catch (IOException e) { + LOG.error("Fail to evict the file " + fileName, e); + } finally { + if (lockEntry != null) { + keyLock.releaseLockEntry(lockEntry); + } + } + } + } + + /** + * Opens a mob file. + * @param fs The current file system. + * @param path The file path. + * @param cacheConf The current MobCacheConfig + * @return A opened mob file. + * @throws IOException + */ + public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException { + if (!isCacheEnabled) { + return MobFile.create(fs, path, conf, cacheConf); + } else { + String fileName = path.getName(); + CachedMobFile cached = map.get(fileName); + IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode()); + try { + if (cached == null) { + cached = map.get(fileName); + if (cached == null) { + if (map.size() > mobFileMaxCacheSize) { + evict(); + } + cached = CachedMobFile.create(fs, path, conf, cacheConf); + cached.open(); + map.put(fileName, cached); + miss.incrementAndGet(); + } + } + cached.open(); + cached.access(count.incrementAndGet()); + } finally { + keyLock.releaseLockEntry(lockEntry); + } + return cached; + } + } + + /** + * Closes a mob file. + * @param file The mob file that needs to be closed. + */ + public void closeFile(MobFile file) { + IdLock.Entry lockEntry = null; + try { + if (!isCacheEnabled) { + file.close(); + } else { + lockEntry = keyLock.getLockEntry(file.getFileName().hashCode()); + file.close(); + } + } catch (IOException e) { + LOG.error("MobFileCache, Exception happen during close " + file.getFileName(), e); + } finally { + if (lockEntry != null) { + keyLock.releaseLockEntry(lockEntry); + } + } + } + + /** + * Gets the count of cached mob files. + * @return The count of the cached mob files. + */ + public int getCacheSize() { + return map == null ? 0 : map.size(); + } + + /** + * Prints the statistics. + */ + public void printStatistics() { + long access = count.get() - lastAccess; + long missed = miss.get() - lastMiss; + long hitRate = (access - missed) * 100 / access; + LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: " + + (access - missed) + ", hit rate: " + + ((access == 0) ? 0 : hitRate) + "%"); + lastAccess += access; + lastMiss += missed; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java new file mode 100644 index 00000000000..937e965c596 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java @@ -0,0 +1,169 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.MD5Hash; + +/** + * The mob file name. + * It consists of a md5 of a start key, a date and an uuid. + * It looks like md5(start) + date + uuid. + *
    + *
  1. 0-31 characters: md5 hex string of a start key. Since the length of the start key is not + * fixed, have to use the md5 instead which has a fix length.
  2. + *
  3. 32-39 characters: a string of a date with format yyyymmdd. The date is the latest timestamp + * of cells in this file
  4. + *
  5. the remaining characters: the uuid.
  6. + *
+ * Using md5 hex string of start key as the prefix of file name makes files with the same start + * key unique, they're different from the ones with other start keys + * The cells come from different regions might be in the same mob file by region split, + * this is allowed. + * Has the latest timestamp of cells in the file name in order to clean the expired mob files by + * TTL easily. If this timestamp is older than the TTL, it's regarded as expired. + */ +@InterfaceAudience.Private +public class MobFileName { + + private final String date; + private final String startKey; + private final String uuid; + private final String fileName; + + /** + * @param startKey + * The start key. + * @param date + * The string of the latest timestamp of cells in this file, the format is yyyymmdd. + * @param uuid + * The uuid + */ + private MobFileName(byte[] startKey, String date, String uuid) { + this.startKey = MD5Hash.getMD5AsHex(startKey, 0, startKey.length); + this.uuid = uuid; + this.date = date; + this.fileName = this.startKey + date + uuid; + } + + /** + * @param startKey + * The md5 hex string of the start key. + * @param date + * The string of the latest timestamp of cells in this file, the format is yyyymmdd. + * @param uuid + * The uuid + */ + private MobFileName(String startKey, String date, String uuid) { + this.startKey = startKey; + this.uuid = uuid; + this.date = date; + this.fileName = this.startKey + date + uuid; + } + + /** + * Creates an instance of MobFileName + * + * @param startKey + * The start key. + * @param date + * The string of the latest timestamp of cells in this file, the format is yyyymmdd. + * @param uuid The uuid. + * @return An instance of a MobFileName. + */ + public static MobFileName create(byte[] startKey, String date, String uuid) { + return new MobFileName(startKey, date, uuid); + } + + /** + * Creates an instance of MobFileName + * + * @param startKey + * The md5 hex string of the start key. + * @param date + * The string of the latest timestamp of cells in this file, the format is yyyymmdd. + * @param uuid The uuid. + * @return An instance of a MobFileName. + */ + public static MobFileName create(String startKey, String date, String uuid) { + return new MobFileName(startKey, date, uuid); + } + + /** + * Creates an instance of MobFileName. + * @param fileName The string format of a file name. + * @return An instance of a MobFileName. + */ + public static MobFileName create(String fileName) { + // The format of a file name is md5HexString(0-31bytes) + date(32-39bytes) + UUID + // The date format is yyyyMMdd + String startKey = fileName.substring(0, 32); + String date = fileName.substring(32, 40); + String uuid = fileName.substring(40); + return new MobFileName(startKey, date, uuid); + } + + /** + * Gets the hex string of the md5 for a start key. + * @return The hex string of the md5 for a start key. + */ + public String getStartKey() { + return startKey; + } + + /** + * Gets the date string. Its format is yyyymmdd. + * @return The date string. + */ + public String getDate() { + return this.date; + } + + @Override + public int hashCode() { + StringBuilder builder = new StringBuilder(); + builder.append(startKey); + builder.append(date); + builder.append(uuid); + return builder.toString().hashCode(); + } + + @Override + public boolean equals(Object anObject) { + if (this == anObject) { + return true; + } + if (anObject instanceof MobFileName) { + MobFileName another = (MobFileName) anObject; + if (this.startKey.equals(another.startKey) && this.date.equals(another.date) + && this.uuid.equals(another.uuid)) { + return true; + } + } + return false; + } + + /** + * Gets the file name. + * @return The file name. + */ + public String getFileName() { + return this.fileName; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java new file mode 100644 index 00000000000..d5e6f2ef215 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.Store; + +/** + * MobStoreEngine creates the mob specific compactor, and store flusher. + */ +@InterfaceAudience.Private +public class MobStoreEngine extends DefaultStoreEngine { + + @Override + protected void createStoreFlusher(Configuration conf, Store store) throws IOException { + // When using MOB, we use DefaultMobStoreFlusher always + // Just use the compactor and compaction policy as that in DefaultStoreEngine. We can have MOB + // specific compactor and policy when that is implemented. + storeFlusher = new DefaultMobStoreFlusher(conf, store); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java new file mode 100644 index 00000000000..149eed41152 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -0,0 +1,263 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * The mob utilities + */ +@InterfaceAudience.Private +public class MobUtils { + + private static final ThreadLocal LOCAL_FORMAT = + new ThreadLocal() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat("yyyyMMdd"); + } + }; + + /** + * Indicates whether the column family is a mob one. + * @param hcd The descriptor of a column family. + * @return True if this column family is a mob one, false if it's not. + */ + public static boolean isMobFamily(HColumnDescriptor hcd) { + byte[] isMob = hcd.getValue(MobConstants.IS_MOB); + return isMob != null && isMob.length == 1 && Bytes.toBoolean(isMob); + } + + /** + * Gets the mob threshold. + * If the size of a cell value is larger than this threshold, it's regarded as a mob. + * The default threshold is 1024*100(100K)B. + * @param hcd The descriptor of a column family. + * @return The threshold. + */ + public static long getMobThreshold(HColumnDescriptor hcd) { + byte[] threshold = hcd.getValue(MobConstants.MOB_THRESHOLD); + return threshold != null && threshold.length == Bytes.SIZEOF_LONG ? Bytes.toLong(threshold) + : MobConstants.DEFAULT_MOB_THRESHOLD; + } + + /** + * Formats a date to a string. + * @param date The date. + * @return The string format of the date, it's yyyymmdd. + */ + public static String formatDate(Date date) { + return LOCAL_FORMAT.get().format(date); + } + + /** + * Parses the string to a date. + * @param dateString The string format of a date, it's yyyymmdd. + * @return A date. + * @throws ParseException + */ + public static Date parseDate(String dateString) throws ParseException { + return LOCAL_FORMAT.get().parse(dateString); + } + + /** + * Whether the current cell is a mob reference cell. + * @param cell The current cell. + * @return True if the cell has a mob reference tag, false if it doesn't. + */ + public static boolean isMobReferenceCell(Cell cell) { + if (cell.getTagsLength() > 0) { + Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), + TagType.MOB_REFERENCE_TAG_TYPE); + return tag != null; + } + return false; + } + + /** + * Whether the tag list has a mob reference tag. + * @param tags The tag list. + * @return True if the list has a mob reference tag, false if it doesn't. + */ + public static boolean hasMobReferenceTag(List tags) { + if (!tags.isEmpty()) { + for (Tag tag : tags) { + if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) { + return true; + } + } + } + return false; + } + + /** + * Indicates whether it's a raw scan. + * The information is set in the attribute "hbase.mob.scan.raw" of scan. + * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file. + * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in + * the mob file. + * @param scan The current scan. + * @return True if it's a raw scan. + */ + public static boolean isRawMobScan(Scan scan) { + byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW); + try { + return raw != null && Bytes.toBoolean(raw); + } catch (IllegalArgumentException e) { + return false; + } + } + + /** + * Indicates whether the scan contains the information of caching blocks. + * The information is set in the attribute "hbase.mob.cache.blocks" of scan. + * @param scan The current scan. + * @return True when the Scan attribute specifies to cache the MOB blocks. + */ + public static boolean isCacheMobBlocks(Scan scan) { + byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS); + try { + return cache != null && Bytes.toBoolean(cache); + } catch (IllegalArgumentException e) { + return false; + } + } + + /** + * Sets the attribute of caching blocks in the scan. + * + * @param scan + * The current scan. + * @param cacheBlocks + * True, set the attribute of caching blocks into the scan, the scanner with this scan + * caches blocks. + * False, the scanner doesn't cache blocks for this scan. + */ + public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) { + scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks)); + } + + /** + * Gets the root dir of the mob files. + * It's {HBASE_DIR}/mobdir. + * @param conf The current configuration. + * @return the root dir of the mob file. + */ + public static Path getMobHome(Configuration conf) { + Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); + return new Path(hbaseDir, MobConstants.MOB_DIR_NAME); + } + + /** + * Gets the region dir of the mob files. + * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}. + * @param conf The current configuration. + * @param tableName The current table name. + * @return The region dir of the mob files. + */ + public static Path getMobRegionPath(Configuration conf, TableName tableName) { + Path tablePath = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); + HRegionInfo regionInfo = getMobRegionInfo(tableName); + return new Path(tablePath, regionInfo.getEncodedName()); + } + + /** + * Gets the family dir of the mob files. + * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. + * @param conf The current configuration. + * @param tableName The current table name. + * @param familyName The current family name. + * @return The family dir of the mob files. + */ + public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) { + return new Path(getMobRegionPath(conf, tableName), familyName); + } + + /** + * Gets the family dir of the mob files. + * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. + * @param regionPath The path of mob region which is a dummy one. + * @param familyName The current family name. + * @return The family dir of the mob files. + */ + public static Path getMobFamilyPath(Path regionPath, String familyName) { + return new Path(regionPath, familyName); + } + + /** + * Gets the HRegionInfo of the mob files. + * This is a dummy region. The mob files are not saved in a region in HBase. + * This is only used in mob snapshot. It's internally used only. + * @param tableName + * @return A dummy mob region info. + */ + public static HRegionInfo getMobRegionInfo(TableName tableName) { + HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES, + HConstants.EMPTY_END_ROW, false, 0); + return info; + } + + /** + * Creates a mob reference KeyValue. + * The value of the mob reference KeyValue is mobCellValueSize + mobFileName. + * @param kv The original KeyValue. + * @param fileName The mob file name where the mob reference KeyValue is written. + * @param tableNameTag The tag of the current table name. It's very important in + * cloning the snapshot. + * @return The mob reference KeyValue. + */ + public static KeyValue createMobRefKeyValue(KeyValue kv, byte[] fileName, Tag tableNameTag) { + // Append the tags to the KeyValue. + // The key is same, the value is the filename of the mob file + List existingTags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength()); + existingTags.add(MobConstants.MOB_REF_TAG); + // Add the tag of the source table name, this table is where this mob file is flushed + // from. + // It's very useful in cloning the snapshot. When reading from the cloning table, we need to + // find the original mob files by this table name. For details please see cloning + // snapshot for mob files. + existingTags.add(tableNameTag); + long valueLength = kv.getValueLength(); + byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName); + KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), + kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), + kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put, + refValue, 0, refValue.length, existingTags); + reference.setSequenceId(kv.getSequenceId()); + return reference; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index f2a0b062e2c..4afa80c6694 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -63,6 +63,12 @@ public class DefaultStoreEngine extends StoreEngine< protected void createComponents( Configuration conf, Store store, KVComparator kvComparator) throws IOException { storeFileManager = new DefaultStoreFileManager(kvComparator, conf); + createCompactor(conf, store); + createCompactionPolicy(conf, store); + createStoreFlusher(conf, store); + } + + protected void createCompactor(Configuration conf, Store store) throws IOException { String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName()); try { compactor = ReflectionUtils.instantiateWithCustomCtor(className, @@ -70,7 +76,10 @@ public class DefaultStoreEngine extends StoreEngine< } catch (Exception e) { throw new IOException("Unable to load configured compactor '" + className + "'", e); } - className = conf.get( + } + + protected void createCompactionPolicy(Configuration conf, Store store) throws IOException { + String className = conf.get( DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName()); try { compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className, @@ -79,7 +88,10 @@ public class DefaultStoreEngine extends StoreEngine< } catch (Exception e) { throw new IOException("Unable to load configured compaction policy '" + className + "'", e); } - className = conf.get( + } + + protected void createStoreFlusher(Configuration conf, Store store) throws IOException { + String className = conf.get( DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName()); try { storeFlusher = ReflectionUtils.instantiateWithCustomCtor(className, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java new file mode 100644 index 00000000000..6b4e4504628 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -0,0 +1,268 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Date; +import java.util.NavigableSet; +import java.util.UUID; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.mob.MobCacheConfig; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFile; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobStoreEngine; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * The store implementation to save MOBs (medium objects), it extends the HStore. + * When a descriptor of a column family has the value "is_mob", it means this column family + * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is + * created. + * HMobStore is almost the same with the HStore except using different types of scanners. + * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. + * In these scanners, a additional seeks in the mob files should be performed after the seek + * to HBase is done. + * The store implements how we save MOBs by extending HStore. When a descriptor + * of a column family has the value "isMob", it means this column family is a mob one. When a + * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is + * almost the same with the HStore except using different types of scanners. In the method of + * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a + * additional seeks in the mob files should be performed after the seek in HBase is done. + */ +@InterfaceAudience.Private +public class HMobStore extends HStore { + + private MobCacheConfig mobCacheConfig; + private Path homePath; + private Path mobFamilyPath; + + public HMobStore(final HRegion region, final HColumnDescriptor family, + final Configuration confParam) throws IOException { + super(region, family, confParam); + this.mobCacheConfig = (MobCacheConfig) cacheConf; + this.homePath = MobUtils.getMobHome(conf); + this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(), + family.getNameAsString()); + } + + /** + * Creates the mob cache config. + */ + @Override + protected void createCacheConf(HColumnDescriptor family) { + cacheConf = new MobCacheConfig(conf, family); + } + + /** + * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in + * the mob files should be performed after the seek in HBase is done. + */ + @Override + protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols, + long readPt, KeyValueScanner scanner) throws IOException { + if (scanner == null) { + scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan, + targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt); + } + return scanner; + } + + /** + * Creates the mob store engine. + */ + @Override + protected StoreEngine createStoreEngine(Store store, Configuration conf, + KVComparator kvComparator) throws IOException { + MobStoreEngine engine = new MobStoreEngine(); + engine.createComponents(conf, store, kvComparator); + return engine; + } + + /** + * Gets the temp directory. + * @return The temp directory. + */ + private Path getTempDir() { + return new Path(homePath, MobConstants.TEMP_DIR_NAME); + } + + /** + * Creates the temp directory of mob files for flushing. + * @param date The latest date of cells in the flushing. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + Path path = getTempDir(); + return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey); + } + + /** + * Creates the temp directory of mob files for flushing. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @return The writer for the mob file. + * @throws IOException + */ + public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey) throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID() + .toString().replaceAll("-", "")); + final CacheConfig writerCacheConf = mobCacheConfig; + HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) + .withIncludesMvcc(false).withIncludesTags(true) + .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) + .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) + .withBlockSize(getFamily().getBlocksize()) + .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()).build(); + + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem()) + .withFilePath(new Path(basePath, mobFileName.getFileName())) + .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + return w; + } + + /** + * Commits the mob file. + * @param sourceFile The source file. + * @param targetPath The directory path where the source file is renamed to. + * @throws IOException + */ + public void commitFile(final Path sourceFile, Path targetPath) throws IOException { + if (sourceFile == null) { + return; + } + Path dstPath = new Path(targetPath, sourceFile.getName()); + validateMobFile(sourceFile); + String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; + LOG.info(msg); + Path parent = dstPath.getParent(); + if (!region.getFilesystem().exists(parent)) { + region.getFilesystem().mkdirs(parent); + } + if (!region.getFilesystem().rename(sourceFile, dstPath)) { + throw new IOException("Failed rename of " + sourceFile + " to " + dstPath); + } + } + + /** + * Validates a mob file by opening and closing it. + * + * @param path the path to the mob file + */ + private void validateMobFile(Path path) throws IOException { + StoreFile storeFile = null; + try { + storeFile = + new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE); + storeFile.createReader(); + } catch (IOException e) { + LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeReader(false); + } + } + } + + /** + * Reads the cell from the mob file. + * @param reference The cell found in the HBase, its value is a path to a mob file. + * @param cacheBlocks Whether the scanner should cache blocks. + * @return The cell found in the mob file. + * @throws IOException + */ + public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException { + Cell result = null; + if (reference.getValueLength() > Bytes.SIZEOF_LONG) { + String fileName = Bytes.toString(reference.getValueArray(), reference.getValueOffset() + + Bytes.SIZEOF_LONG, reference.getValueLength() - Bytes.SIZEOF_LONG); + Path targetPath = new Path(mobFamilyPath, fileName); + MobFile file = null; + try { + file = mobCacheConfig.getMobFileCache().openFile(region.getFilesystem(), targetPath, + mobCacheConfig); + result = file.readCell(reference, cacheBlocks); + } catch (IOException e) { + LOG.error("Fail to open/read the mob file " + targetPath.toString(), e); + } catch (NullPointerException e) { + // When delete the file during the scan, the hdfs getBlockRange will + // throw NullPointerException, catch it and manage it. + LOG.error("Fail to read the mob file " + targetPath.toString() + + " since it's already deleted", e); + } finally { + if (file != null) { + mobCacheConfig.getMobFileCache().closeFile(file); + } + } + } else { + LOG.warn("Invalid reference to mob, " + reference.getValueLength() + " bytes is too short"); + } + + if (result == null) { + LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family," + + "qualifier,timestamp,type and tags but with an empty value to return."); + result = new KeyValue(reference.getRowArray(), reference.getRowOffset(), + reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(), + reference.getFamilyLength(), reference.getQualifierArray(), + reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(), + Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY, + 0, 0, reference.getTagsArray(), reference.getTagsOffset(), + reference.getTagsLength()); + } + return result; + } + + /** + * Gets the mob file path. + * @return The mob file path. + */ + public Path getPath() { + return mobFamilyPath; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b879e8a73f6..c87c12bfb8c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -3552,6 +3553,9 @@ public class HRegion implements HeapSize { // , Writable{ } protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException { + if (MobUtils.isMobFamily(family)) { + return new HMobStore(this, family, this.conf); + } return new HStore(this, family, this.conf); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 8d20d7b445b..45edf7f5d2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; @@ -132,11 +133,11 @@ public class HStore implements Store { protected final MemStore memstore; // This stores directory in the filesystem. - private final HRegion region; + protected final HRegion region; private final HColumnDescriptor family; private final HRegionFileSystem fs; - private final Configuration conf; - private final CacheConfig cacheConf; + protected final Configuration conf; + protected CacheConfig cacheConf; private long lastCompactSize = 0; volatile boolean forceMajor = false; /* how many bytes to write between status checks */ @@ -244,7 +245,7 @@ public class HStore implements Store { this.offPeakHours = OffPeakHours.getInstance(conf); // Setting up cache configuration for this family - this.cacheConf = new CacheConfig(conf, family); + createCacheConf(family); this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); @@ -263,7 +264,7 @@ public class HStore implements Store { "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); } - this.storeEngine = StoreEngine.create(this, this.conf, this.comparator); + this.storeEngine = createStoreEngine(this, this.conf, this.comparator); this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles()); // Initialize checksum type from name. The names are CRC32, CRC32C, etc. @@ -337,6 +338,27 @@ public class HStore implements Store { } } + /** + * Creates the cache config. + * @param family The current column family. + */ + protected void createCacheConf(final HColumnDescriptor family) { + this.cacheConf = new CacheConfig(conf, family); + } + + /** + * Creates the store engine configured for the given Store. + * @param store The store. An unfortunate dependency needed due to it + * being passed to coprocessors via the compactor. + * @param conf Store configuration. + * @param kvComparator KVComparator for storeFileManager. + * @return StoreEngine to use. + */ + protected StoreEngine createStoreEngine(Store store, Configuration conf, + KVComparator kvComparator) throws IOException { + return StoreEngine.create(store, conf, comparator); + } + /** * @param family * @return TTL in seconds of the specified family @@ -1886,17 +1908,23 @@ public class HStore implements Store { if (this.getCoprocessorHost() != null) { scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); } - if (scanner == null) { - scanner = scan.isReversed() ? new ReversedStoreScanner(this, - getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this, - getScanInfo(), scan, targetCols, readPt); - } + scanner = createScanner(scan, targetCols, readPt, scanner); return scanner; } finally { lock.readLock().unlock(); } } + protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols, + long readPt, KeyValueScanner scanner) throws IOException { + if (scanner == null) { + scanner = scan.isReversed() ? new ReversedStoreScanner(this, + getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this, + getScanInfo(), scan, targetCols, readPt); + } + return scanner; + } + @Override public String toString() { return this.getColumnFamilyName(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java new file mode 100644 index 00000000000..b4bcbe7a029 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobUtils; + +/** + * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into List + * for a single row. + * + */ +@InterfaceAudience.Private +public class MobStoreScanner extends StoreScanner { + + private boolean cacheMobBlocks = false; + + public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + final NavigableSet columns, long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); + } + + /** + * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the + * reference tag), the scanner need seek this cell from the mob file, and use the cell found + * from the mob file as the result. + */ + @Override + public boolean next(List outResult, int limit) throws IOException { + boolean result = super.next(outResult, limit); + if (!MobUtils.isRawMobScan(scan)) { + // retrieve the mob data + if (outResult.isEmpty()) { + return result; + } + HMobStore mobStore = (HMobStore) store; + for (int i = 0; i < outResult.size(); i++) { + Cell cell = outResult.get(i); + if (MobUtils.isMobReferenceCell(cell)) { + outResult.set(i, mobStore.resolve(cell, cacheMobBlocks)); + } + } + } + return result; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java new file mode 100644 index 00000000000..e384390b20a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobUtils; + +/** + * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support + * reversed scanning in both the memstore and the MOB store. + * + */ +@InterfaceAudience.Private +public class ReversedMobStoreScanner extends ReversedStoreScanner { + + private boolean cacheMobBlocks = false; + + ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet columns, + long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); + } + + /** + * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the + * reference tag), the scanner need seek this cell from the mob file, and use the cell found + * from the mob file as the result. + */ + @Override + public boolean next(List outResult, int limit) throws IOException { + boolean result = super.next(outResult, limit); + if (!MobUtils.isRawMobScan(scan)) { + // retrieve the mob data + if (outResult.isEmpty()) { + return result; + } + HMobStore mobStore = (HMobStore) store; + for (int i = 0; i < outResult.size(); i++) { + Cell cell = outResult.get(i); + if (MobUtils.isMobReferenceCell(cell)) { + outResult.set(i, mobStore.resolve(cell, cacheMobBlocks)); + } + } + } + return result; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 27c64f0120b..13ded588be4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -915,7 +915,7 @@ public class StoreFile { return this.writer.getPath(); } - boolean hasGeneralBloom() { + public boolean hasGeneralBloom() { return this.generalBloomFilterWriter != null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java new file mode 100644 index 00000000000..5e28cd96f5c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java @@ -0,0 +1,86 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; + +public class MobTestUtil { + protected static final char FIRST_CHAR = 'a'; + protected static final char LAST_CHAR = 'z'; + + protected static String generateRandomString(int demoLength) { + String base = "abcdefghijklmnopqrstuvwxyz"; + Random random = new Random(); + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < demoLength; i++) { + int number = random.nextInt(base.length()); + sb.append(base.charAt(number)); + } + return sb.toString(); + } + protected static void writeStoreFile(final StoreFile.Writer writer, String caseName) + throws IOException { + writeStoreFile(writer, Bytes.toBytes(caseName), Bytes.toBytes(caseName)); + } + + /* + * Writes HStoreKey and ImmutableBytes data to passed writer and then closes + * it. + * + * @param writer + * + * @throws IOException + */ + private static void writeStoreFile(final StoreFile.Writer writer, byte[] fam, + byte[] qualifier) throws IOException { + long now = System.currentTimeMillis(); + try { + for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) { + for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) { + byte[] b = new byte[] { (byte) d, (byte) e }; + writer.append(new KeyValue(b, fam, qualifier, now, b)); + } + } + } finally { + writer.close(); + } + } + + /** + * Compare two KeyValue only for their row family qualifier value + */ + public static void assertKeyValuesEquals(KeyValue firstKeyValue, + KeyValue secondKeyValue) { + Assert.assertEquals(Bytes.toString(CellUtil.cloneRow(firstKeyValue)), + Bytes.toString(CellUtil.cloneRow(secondKeyValue))); + Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(firstKeyValue)), + Bytes.toString(CellUtil.cloneFamily(secondKeyValue))); + Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(firstKeyValue)), + Bytes.toString(CellUtil.cloneQualifier(secondKeyValue))); + Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(firstKeyValue)), + Bytes.toString(CellUtil.cloneValue(secondKeyValue))); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java new file mode 100644 index 00000000000..b39dd2ae11f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java @@ -0,0 +1,154 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestCachedMobFile extends TestCase{ + static final Log LOG = LogFactory.getLog(TestCachedMobFile.class); + private Configuration conf = HBaseConfiguration.create(); + private CacheConfig cacheConf = new CacheConfig(conf); + private final String TABLE = "tableName"; + private final String FAMILY = "familyName"; + private final String FAMILY1 = "familyName1"; + private final String FAMILY2 = "familyName2"; + private final long EXPECTED_REFERENCE_ZERO = 0; + private final long EXPECTED_REFERENCE_ONE = 1; + private final long EXPECTED_REFERENCE_TWO = 2; + + @Test + public void testOpenClose() throws Exception { + String caseName = getName(); + FileSystem fs = FileSystem.get(conf); + Path testDir = FSUtils.getRootDir(conf); + Path outputDir = new Path(new Path(testDir, TABLE), + FAMILY); + HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build(); + StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withOutputDir(outputDir).withFileContext(meta).build(); + MobTestUtil.writeStoreFile(writer, caseName); + CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf); + Assert.assertEquals(EXPECTED_REFERENCE_ZERO, cachedMobFile.getReferenceCount()); + cachedMobFile.open(); + Assert.assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile.getReferenceCount()); + cachedMobFile.open(); + Assert.assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile.getReferenceCount()); + cachedMobFile.close(); + Assert.assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile.getReferenceCount()); + cachedMobFile.close(); + Assert.assertEquals(EXPECTED_REFERENCE_ZERO, cachedMobFile.getReferenceCount()); + } + + @Test + public void testCompare() throws Exception { + String caseName = getName(); + FileSystem fs = FileSystem.get(conf); + Path testDir = FSUtils.getRootDir(conf); + Path outputDir1 = new Path(new Path(testDir, TABLE), + FAMILY1); + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + StoreFile.Writer writer1 = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withOutputDir(outputDir1).withFileContext(meta).build(); + MobTestUtil.writeStoreFile(writer1, caseName); + CachedMobFile cachedMobFile1 = CachedMobFile.create(fs, writer1.getPath(), conf, cacheConf); + Path outputDir2 = new Path(new Path(testDir, TABLE), + FAMILY2); + StoreFile.Writer writer2 = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withOutputDir(outputDir2) + .withFileContext(meta) + .build(); + MobTestUtil.writeStoreFile(writer2, caseName); + CachedMobFile cachedMobFile2 = CachedMobFile.create(fs, writer2.getPath(), conf, cacheConf); + cachedMobFile1.access(1); + cachedMobFile2.access(2); + Assert.assertEquals(cachedMobFile1.compareTo(cachedMobFile2), 1); + Assert.assertEquals(cachedMobFile2.compareTo(cachedMobFile1), -1); + Assert.assertEquals(cachedMobFile1.compareTo(cachedMobFile1), 0); + } + + @Test + public void testReadKeyValue() throws Exception { + FileSystem fs = FileSystem.get(conf); + Path testDir = FSUtils.getRootDir(conf); + Path outputDir = new Path(new Path(testDir, TABLE), "familyname"); + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withOutputDir(outputDir).withFileContext(meta).build(); + String caseName = getName(); + MobTestUtil.writeStoreFile(writer, caseName); + CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf); + byte[] family = Bytes.toBytes(caseName); + byte[] qualify = Bytes.toBytes(caseName); + // Test the start key + byte[] startKey = Bytes.toBytes("aa"); // The start key bytes + KeyValue expectedKey = + new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey); + KeyValue seekKey = expectedKey.createKeyOnly(false); + KeyValue kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false)); + MobTestUtil.assertKeyValuesEquals(expectedKey, kv); + + // Test the end key + byte[] endKey = Bytes.toBytes("zz"); // The end key bytes + expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey); + seekKey = expectedKey.createKeyOnly(false); + kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false)); + MobTestUtil.assertKeyValuesEquals(expectedKey, kv); + + // Test the random key + byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2)); + expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey); + seekKey = expectedKey.createKeyOnly(false); + kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false)); + MobTestUtil.assertKeyValuesEquals(expectedKey, kv); + + // Test the key which is less than the start key + byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa" + expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey); + seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey); + kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false)); + MobTestUtil.assertKeyValuesEquals(expectedKey, kv); + + // Test the key which is more than the end key + byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz" + seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey); + kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false)); + Assert.assertNull(kv); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java new file mode 100644 index 00000000000..5d85718ee43 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java @@ -0,0 +1,193 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestDefaultMobStoreFlusher { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte [] row1 = Bytes.toBytes("row1"); + private final static byte [] row2 = Bytes.toBytes("row2"); + private final static byte [] family = Bytes.toBytes("family"); + private final static byte [] qf1 = Bytes.toBytes("qf1"); + private final static byte [] qf2 = Bytes.toBytes("qf2"); + private final static byte [] value1 = Bytes.toBytes("value1"); + private final static byte [] value2 = Bytes.toBytes("value2"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testFlushNonMobFile() throws InterruptedException { + String TN = "testFlushNonMobFile"; + HTable table = null; + HBaseAdmin admin = null; + + try { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TN)); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMaxVersions(4); + desc.addFamily(hcd); + + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + admin.createTable(desc); + table = new HTable(TEST_UTIL.getConfiguration(), TN); + + //Put data + Put put0 = new Put(row1); + put0.add(family, qf1, 1, value1); + table.put(put0); + + //Put more data + Put put1 = new Put(row2); + put1.add(family, qf2, 1, value2); + table.put(put1); + + //Flush + table.flushCommits(); + admin.flush(TN); + + Scan scan = new Scan(); + scan.addColumn(family, qf1); + scan.setMaxVersions(4); + ResultScanner scanner = table.getScanner(scan); + + //Compare + Result result = scanner.next(); + int size = 0; + while (result != null) { + size++; + List cells = result.getColumnCells(family, qf1); + // Verify the cell size + Assert.assertEquals(1, cells.size()); + // Verify the value + Assert.assertEquals(Bytes.toString(value1), + Bytes.toString(CellUtil.cloneValue(cells.get(0)))); + result = scanner.next(); + } + scanner.close(); + Assert.assertEquals(1, size); + admin.close(); + } catch (MasterNotRunningException e1) { + e1.printStackTrace(); + } catch (ZooKeeperConnectionException e2) { + e2.printStackTrace(); + } catch (IOException e3) { + e3.printStackTrace(); + } + } + + @Test + public void testFlushMobFile() throws InterruptedException { + String TN = "testFlushMobFile"; + HTable table = null; + HBaseAdmin admin = null; + + try { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TN)); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L)); + hcd.setMaxVersions(4); + desc.addFamily(hcd); + + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + admin.createTable(desc); + table = new HTable(TEST_UTIL.getConfiguration(), TN); + + //put data + Put put0 = new Put(row1); + put0.add(family, qf1, 1, value1); + table.put(put0); + + //put more data + Put put1 = new Put(row2); + put1.add(family, qf2, 1, value2); + table.put(put1); + + //flush + table.flushCommits(); + admin.flush(TN); + + //Scan + Scan scan = new Scan(); + scan.addColumn(family, qf1); + scan.setMaxVersions(4); + ResultScanner scanner = table.getScanner(scan); + + //Compare + Result result = scanner.next(); + int size = 0; + while (result != null) { + size++; + List cells = result.getColumnCells(family, qf1); + // Verify the the cell size + Assert.assertEquals(1, cells.size()); + // Verify the value + Assert.assertEquals(Bytes.toString(value1), + Bytes.toString(CellUtil.cloneValue(cells.get(0)))); + result = scanner.next(); + } + scanner.close(); + Assert.assertEquals(1, size); + admin.close(); + } catch (MasterNotRunningException e1) { + e1.printStackTrace(); + } catch (ZooKeeperConnectionException e2) { + e2.printStackTrace(); + } catch (IOException e3) { + e3.printStackTrace(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java new file mode 100644 index 00000000000..ae10aad6204 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java @@ -0,0 +1,141 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.mob; + +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestMobDataBlockEncoding { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte [] row1 = Bytes.toBytes("row1"); + private final static byte [] family = Bytes.toBytes("family"); + private final static byte [] qf1 = Bytes.toBytes("qualifier1"); + private final static byte [] qf2 = Bytes.toBytes("qualifier2"); + protected final byte[] qf3 = Bytes.toBytes("qualifier3"); + private static HTable table; + private static HBaseAdmin admin; + private static HColumnDescriptor hcd; + private static HTableDescriptor desc; + private static Random random = new Random(); + private static long defaultThreshold = 10; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + public void setUp(long threshold, String TN, DataBlockEncoding encoding) + throws Exception { + desc = new HTableDescriptor(TableName.valueOf(TN)); + hcd = new HColumnDescriptor(family); + hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(threshold)); + hcd.setMaxVersions(4); + hcd.setDataBlockEncoding(encoding); + desc.addFamily(hcd); + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + admin.createTable(desc); + table = new HTable(TEST_UTIL.getConfiguration(), TN); + } + + /** + * Generate the mob value. + * + * @param size the size of the value + * @return the mob value generated + */ + private static byte[] generateMobValue(int size) { + byte[] mobVal = new byte[size]; + random.nextBytes(mobVal); + return mobVal; + } + + @Test + public void testDataBlockEncoding() throws Exception { + for (DataBlockEncoding encoding : DataBlockEncoding.values()) { + testDataBlockEncoding(encoding); + } + } + + public void testDataBlockEncoding(DataBlockEncoding encoding) throws Exception { + String TN = "testDataBlockEncoding" + encoding; + setUp(defaultThreshold, TN, encoding); + long ts1 = System.currentTimeMillis(); + long ts2 = ts1 + 1; + long ts3 = ts1 + 2; + byte[] value = generateMobValue((int) defaultThreshold + 1); + + Put put1 = new Put(row1); + put1.add(family, qf1, ts3, value); + put1.add(family, qf2, ts2, value); + put1.add(family, qf3, ts1, value); + table.put(put1); + + table.flushCommits(); + admin.flush(TN); + + Scan scan = new Scan(); + scan.setMaxVersions(4); + + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + List cells = res.listCells(); + for(Cell cell : cells) { + // Verify the value + Assert.assertEquals(Bytes.toString(value), + Bytes.toString(CellUtil.cloneValue(cell))); + count++; + } + } + results.close(); + Assert.assertEquals(3, count); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java new file mode 100644 index 00000000000..f6511f71f25 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java @@ -0,0 +1,124 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestMobFile extends TestCase { + static final Log LOG = LogFactory.getLog(TestMobFile.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private Configuration conf = TEST_UTIL.getConfiguration(); + private CacheConfig cacheConf = new CacheConfig(conf); + private final String TABLE = "tableName"; + private final String FAMILY = "familyName"; + + @Test + public void testReadKeyValue() throws Exception { + FileSystem fs = FileSystem.get(conf); + Path testDir = FSUtils.getRootDir(conf); + Path outputDir = new Path(new Path(testDir, TABLE), FAMILY); + HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build(); + StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withOutputDir(outputDir) + .withFileContext(meta) + .build(); + String caseName = getName(); + MobTestUtil.writeStoreFile(writer, caseName); + + MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(), + conf, cacheConf, BloomType.NONE)); + byte[] family = Bytes.toBytes(caseName); + byte[] qualify = Bytes.toBytes(caseName); + + // Test the start key + byte[] startKey = Bytes.toBytes("aa"); // The start key bytes + KeyValue expectedKey = + new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey); + KeyValue seekKey = expectedKey.createKeyOnly(false); + KeyValue kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false)); + MobTestUtil.assertKeyValuesEquals(expectedKey, kv); + + // Test the end key + byte[] endKey = Bytes.toBytes("zz"); // The end key bytes + expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey); + seekKey = expectedKey.createKeyOnly(false); + kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false)); + MobTestUtil.assertKeyValuesEquals(expectedKey, kv); + + // Test the random key + byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2)); + expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey); + seekKey = expectedKey.createKeyOnly(false); + kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false)); + MobTestUtil.assertKeyValuesEquals(expectedKey, kv); + + // Test the key which is less than the start key + byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa" + expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey); + seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey); + kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false)); + MobTestUtil.assertKeyValuesEquals(expectedKey, kv); + + // Test the key which is more than the end key + byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz" + seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey); + kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false)); + assertNull(kv); + } + + @Test + public void testGetScanner() throws Exception { + FileSystem fs = FileSystem.get(conf); + Path testDir = FSUtils.getRootDir(conf); + Path outputDir = new Path(new Path(testDir, TABLE), FAMILY); + HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build(); + StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withOutputDir(outputDir) + .withFileContext(meta) + .build(); + MobTestUtil.writeStoreFile(writer, getName()); + + MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(), + conf, cacheConf, BloomType.NONE)); + assertNotNull(mobFile.getScanner()); + assertTrue(mobFile.getScanner() instanceof StoreFileScanner); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java new file mode 100644 index 00000000000..9478544cbe9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java @@ -0,0 +1,206 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.Date; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HMobStore; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestMobFileCache extends TestCase { + static final Log LOG = LogFactory.getLog(TestMobFileCache.class); + private HBaseTestingUtility UTIL; + private HRegion region; + private Configuration conf; + private MobCacheConfig mobCacheConf; + private MobFileCache mobFileCache; + private Date currentDate = new Date(); + private final String TEST_CACHE_SIZE = "2"; + private final int EXPECTED_CACHE_SIZE_ZERO = 0; + private final int EXPECTED_CACHE_SIZE_ONE = 1; + private final int EXPECTED_CACHE_SIZE_TWO = 2; + private final int EXPECTED_CACHE_SIZE_THREE = 3; + private final long EXPECTED_REFERENCE_ONE = 1; + private final long EXPECTED_REFERENCE_TWO = 2; + + private final String TABLE = "tableName"; + private final String FAMILY1 = "family1"; + private final String FAMILY2 = "family2"; + private final String FAMILY3 = "family3"; + + private final byte[] ROW = Bytes.toBytes("row"); + private final byte[] ROW2 = Bytes.toBytes("row2"); + private final byte[] VALUE = Bytes.toBytes("value"); + private final byte[] VALUE2 = Bytes.toBytes("value2"); + private final byte[] QF1 = Bytes.toBytes("qf1"); + private final byte[] QF2 = Bytes.toBytes("qf2"); + private final byte[] QF3 = Bytes.toBytes("qf3"); + + @Override + public void setUp() throws Exception { + UTIL = HBaseTestingUtility.createLocalHTU(); + conf = UTIL.getConfiguration(); + HTableDescriptor htd = UTIL.createTableDescriptor("testMobFileCache"); + HColumnDescriptor hcd1 = new HColumnDescriptor(FAMILY1); + hcd1.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd1.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L)); + HColumnDescriptor hcd2 = new HColumnDescriptor(FAMILY2); + hcd2.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd2.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L)); + HColumnDescriptor hcd3 = new HColumnDescriptor(FAMILY3); + hcd3.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd3.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L)); + htd.addFamily(hcd1); + htd.addFamily(hcd2); + htd.addFamily(hcd3); + region = UTIL.createLocalHRegion(htd, null, null); + } + + @Override + protected void tearDown() throws Exception { + region.close(); + region.getFilesystem().delete(UTIL.getDataTestDir(), true); + } + + /** + * Create the mob store file. + * @param family + */ + private Path createMobStoreFile(String family) throws IOException { + return createMobStoreFile(HBaseConfiguration.create(), family); + } + + /** + * Create the mob store file + * @param conf + * @param family + */ + private Path createMobStoreFile(Configuration conf, String family) throws IOException { + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMaxVersions(4); + hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + mobCacheConf = new MobCacheConfig(conf, hcd); + return createMobStoreFile(conf, hcd); + } + + /** + * Create the mob store file + * @param conf + * @param hcd + */ + private Path createMobStoreFile(Configuration conf, HColumnDescriptor hcd) + throws IOException { + // Setting up a Store + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE)); + htd.addFamily(hcd); + HMobStore mobStore = (HMobStore) region.getStore(hcd.getName()); + KeyValue key1 = new KeyValue(ROW, hcd.getName(), QF1, 1, VALUE); + KeyValue key2 = new KeyValue(ROW, hcd.getName(), QF2, 1, VALUE); + KeyValue key3 = new KeyValue(ROW2, hcd.getName(), QF3, 1, VALUE2); + KeyValue[] keys = new KeyValue[] { key1, key2, key3 }; + int maxKeyCount = keys.length; + HRegionInfo regionInfo = new HRegionInfo(); + StoreFile.Writer mobWriter = mobStore.createWriterInTmp(currentDate, + maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey()); + Path mobFilePath = mobWriter.getPath(); + String fileName = mobFilePath.getName(); + mobWriter.append(key1); + mobWriter.append(key2); + mobWriter.append(key3); + mobWriter.close(); + String targetPathName = MobUtils.formatDate(currentDate); + Path targetPath = new Path(mobStore.getPath(), targetPathName); + mobStore.commitFile(mobFilePath, targetPath); + return new Path(targetPath, fileName); + } + + @Test + public void testMobFileCache() throws Exception { + FileSystem fs = FileSystem.get(conf); + conf.set(MobConstants.MOB_FILE_CACHE_SIZE_KEY, TEST_CACHE_SIZE); + mobFileCache = new MobFileCache(conf); + Path file1Path = createMobStoreFile(FAMILY1); + Path file2Path = createMobStoreFile(FAMILY2); + Path file3Path = createMobStoreFile(FAMILY3); + + // Before open one file by the MobFileCache + assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize()); + // Open one file by the MobFileCache + CachedMobFile cachedMobFile1 = (CachedMobFile) mobFileCache.openFile( + fs, file1Path, mobCacheConf); + assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize()); + assertNotNull(cachedMobFile1); + assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount()); + + // The evict is also managed by a schedule thread pool. + // And its check period is set as 3600 seconds by default. + // This evict should get the lock at the most time + mobFileCache.evict(); // Cache not full, evict it + assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize()); + assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount()); + + mobFileCache.evictFile(file1Path.getName()); // Evict one file + assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize()); + assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile1.getReferenceCount()); + + cachedMobFile1.close(); // Close the cached mob file + + // Reopen three cached file + cachedMobFile1 = (CachedMobFile) mobFileCache.openFile( + fs, file1Path, mobCacheConf); + assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize()); + CachedMobFile cachedMobFile2 = (CachedMobFile) mobFileCache.openFile( + fs, file2Path, mobCacheConf); + assertEquals(EXPECTED_CACHE_SIZE_TWO, mobFileCache.getCacheSize()); + CachedMobFile cachedMobFile3 = (CachedMobFile) mobFileCache.openFile( + fs, file3Path, mobCacheConf); + // Before the evict + // Evict the cache, should clost the first file 1 + assertEquals(EXPECTED_CACHE_SIZE_THREE, mobFileCache.getCacheSize()); + assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount()); + assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile2.getReferenceCount()); + assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile3.getReferenceCount()); + mobFileCache.evict(); + assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize()); + assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile1.getReferenceCount()); + assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile2.getReferenceCount()); + assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile3.getReferenceCount()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java new file mode 100644 index 00000000000..9a6cf7f879f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java @@ -0,0 +1,79 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.util.Date; +import java.util.Random; +import java.util.UUID; + +import junit.framework.TestCase; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.MD5Hash; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestMobFileName extends TestCase { + + private String uuid; + private Date date; + private String dateStr; + private byte[] startKey; + + public void setUp() { + Random random = new Random(); + uuid = UUID.randomUUID().toString().replaceAll("-", ""); + date = new Date(); + dateStr = MobUtils.formatDate(date); + startKey = Bytes.toBytes(random.nextInt()); + } + + @Test + public void testHashCode() { + assertEquals(MobFileName.create(startKey, dateStr, uuid).hashCode(), + MobFileName.create(startKey, dateStr, uuid).hashCode()); + assertNotSame(MobFileName.create(startKey, dateStr, uuid).hashCode(), + MobFileName.create(startKey, dateStr, uuid).hashCode()); + } + + @Test + public void testCreate() { + MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid); + assertEquals(mobFileName, MobFileName.create(mobFileName.getFileName())); + } + + @Test + public void testGet() { + MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid); + assertEquals(MD5Hash.getMD5AsHex(startKey, 0, startKey.length), mobFileName.getStartKey()); + assertEquals(dateStr, mobFileName.getDate()); + assertEquals(mobFileName.getFileName(), MD5Hash.getMD5AsHex(startKey, 0, startKey.length) + + dateStr + uuid); + } + + @Test + public void testEquals() { + MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid); + assertTrue(mobFileName.equals(mobFileName)); + assertFalse(mobFileName.equals(this)); + assertTrue(mobFileName.equals(MobFileName.create(startKey, dateStr, uuid))); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java new file mode 100644 index 00000000000..ed4b703486e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestDeleteMobTable { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte[] FAMILY = Bytes.toBytes("family"); + private final static byte[] QF = Bytes.toBytes("qualifier"); + private static Random random = new Random(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); + TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Generate the mob value. + * + * @param size + * the size of the value + * @return the mob value generated + */ + private static byte[] generateMobValue(int size) { + byte[] mobVal = new byte[size]; + random.nextBytes(mobVal); + return mobVal; + } + + @Test + public void testDeleteMobTable() throws Exception { + byte[] tableName = Bytes.toBytes("testDeleteMobTable"); + TableName tn = TableName.valueOf(tableName); + HTableDescriptor htd = new HTableDescriptor(tn); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); + hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L)); + htd.addFamily(hcd); + HBaseAdmin admin = null; + HTable table = null; + try { + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + admin.createTable(htd); + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + byte[] value = generateMobValue(10); + + byte[] row = Bytes.toBytes("row"); + Put put = new Put(row); + put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value); + table.put(put); + + table.flushCommits(); + admin.flush(tableName); + + // the mob file exists + Assert.assertEquals(1, countMobFiles(tn, hcd.getNameAsString())); + Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString())); + String fileName = assertHasOneMobRow(table, tn, hcd.getNameAsString()); + Assert.assertFalse(mobArchiveExist(tn, hcd.getNameAsString(), fileName)); + Assert.assertTrue(mobTableDirExist(tn)); + table.close(); + + admin.disableTable(tn); + admin.deleteTable(tn); + + Assert.assertFalse(admin.tableExists(tn)); + Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString())); + Assert.assertEquals(1, countArchiveMobFiles(tn, hcd.getNameAsString())); + Assert.assertTrue(mobArchiveExist(tn, hcd.getNameAsString(), fileName)); + Assert.assertFalse(mobTableDirExist(tn)); + } finally { + if (admin != null) { + admin.close(); + } + } + } + + @Test + public void testDeleteNonMobTable() throws Exception { + byte[] tableName = Bytes.toBytes("testDeleteNonMobTable"); + TableName tn = TableName.valueOf(tableName); + HTableDescriptor htd = new HTableDescriptor(tn); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); + htd.addFamily(hcd); + HBaseAdmin admin = null; + HTable table = null; + try { + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + admin.createTable(htd); + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + byte[] value = generateMobValue(10); + + byte[] row = Bytes.toBytes("row"); + Put put = new Put(row); + put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value); + table.put(put); + + table.flushCommits(); + admin.flush(tableName); + table.close(); + + // the mob file doesn't exist + Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString())); + Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString())); + Assert.assertFalse(mobTableDirExist(tn)); + + admin.disableTable(tn); + admin.deleteTable(tn); + + Assert.assertFalse(admin.tableExists(tn)); + Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString())); + Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString())); + Assert.assertFalse(mobTableDirExist(tn)); + } finally { + if (admin != null) { + admin.close(); + } + } + } + + private int countMobFiles(TableName tn, String familyName) throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path mobFileDir = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName); + if (fs.exists(mobFileDir)) { + return fs.listStatus(mobFileDir).length; + } else { + return 0; + } + } + + private int countArchiveMobFiles(TableName tn, String familyName) + throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn, + MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName); + if (fs.exists(storePath)) { + return fs.listStatus(storePath).length; + } else { + return 0; + } + } + + private boolean mobTableDirExist(TableName tn) throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path tableDir = FSUtils.getTableDir(MobUtils.getMobHome(TEST_UTIL.getConfiguration()), tn); + return fs.exists(tableDir); + } + + private boolean mobArchiveExist(TableName tn, String familyName, String fileName) + throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn, + MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName); + return fs.exists(new Path(storePath, fileName)); + } + + private String assertHasOneMobRow(HTable table, TableName tn, String familyName) + throws IOException { + Scan scan = new Scan(); + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + ResultScanner rs = table.getScanner(scan); + Result r = rs.next(); + Assert.assertNotNull(r); + byte[] value = r.getValue(FAMILY, QF); + String fileName = Bytes.toString(value, Bytes.SIZEOF_LONG, value.length - Bytes.SIZEOF_LONG); + Path filePath = new Path( + MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName), fileName); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Assert.assertTrue(fs.exists(filePath)); + r = rs.next(); + Assert.assertNull(r); + return fileName; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java new file mode 100644 index 00000000000..c253611a677 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -0,0 +1,471 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; + +@Category(MediumTests.class) +public class TestHMobStore { + public static final Log LOG = LogFactory.getLog(TestHMobStore.class); + @Rule public TestName name = new TestName(); + + private HMobStore store; + private HRegion region; + private HColumnDescriptor hcd; + private FileSystem fs; + private byte [] table = Bytes.toBytes("table"); + private byte [] family = Bytes.toBytes("family"); + private byte [] row = Bytes.toBytes("row"); + private byte [] row2 = Bytes.toBytes("row2"); + private byte [] qf1 = Bytes.toBytes("qf1"); + private byte [] qf2 = Bytes.toBytes("qf2"); + private byte [] qf3 = Bytes.toBytes("qf3"); + private byte [] qf4 = Bytes.toBytes("qf4"); + private byte [] qf5 = Bytes.toBytes("qf5"); + private byte [] qf6 = Bytes.toBytes("qf6"); + private byte[] value = Bytes.toBytes("value"); + private byte[] value2 = Bytes.toBytes("value2"); + private Path mobFilePath; + private Date currentDate = new Date(); + private KeyValue seekKey1; + private KeyValue seekKey2; + private KeyValue seekKey3; + private NavigableSet qualifiers = + new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); + private List expected = new ArrayList(); + private long id = System.currentTimeMillis(); + private Get get = new Get(row); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final String DIR = TEST_UTIL.getDataTestDir("TestHMobStore").toString(); + + /** + * Setup + * @throws Exception + */ + @Before + public void setUp() throws Exception { + qualifiers.add(qf1); + qualifiers.add(qf3); + qualifiers.add(qf5); + + Iterator iter = qualifiers.iterator(); + while(iter.hasNext()){ + byte [] next = iter.next(); + expected.add(new KeyValue(row, family, next, 1, value)); + get.addColumn(family, next); + get.setMaxVersions(); // all versions. + } + } + + private void init(String methodName, Configuration conf, boolean testStore) + throws IOException { + hcd = new HColumnDescriptor(family); + hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L)); + hcd.setMaxVersions(4); + init(methodName, conf, hcd, testStore); + } + + private void init(String methodName, Configuration conf, + HColumnDescriptor hcd, boolean testStore) throws IOException { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); + init(methodName, conf, htd, hcd, testStore); + } + + private void init(String methodName, Configuration conf, HTableDescriptor htd, + HColumnDescriptor hcd, boolean testStore) throws IOException { + //Setting up tje Region and Store + Path basedir = new Path(DIR+methodName); + Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); + String logName = "logs"; + Path logdir = new Path(basedir, logName); + FileSystem fs = FileSystem.get(conf); + fs.delete(logdir, true); + + htd.addFamily(hcd); + HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); + HLog hlog = HLogFactory.createHLog(fs, basedir, logName, conf); + region = new HRegion(tableDir, hlog, fs, conf, info, htd, null); + store = new HMobStore(region, hcd, conf); + if(testStore) { + init(conf, hcd); + } + } + + private void init(Configuration conf, HColumnDescriptor hcd) + throws IOException { + Path basedir = FSUtils.getRootDir(conf); + fs = FileSystem.get(conf); + Path homePath = new Path(basedir, Bytes.toString(family) + Path.SEPARATOR + + Bytes.toString(family)); + fs.mkdirs(homePath); + + KeyValue key1 = new KeyValue(row, family, qf1, 1, value); + KeyValue key2 = new KeyValue(row, family, qf2, 1, value); + KeyValue key3 = new KeyValue(row2, family, qf3, 1, value2); + KeyValue[] keys = new KeyValue[] { key1, key2, key3 }; + int maxKeyCount = keys.length; + StoreFile.Writer mobWriter = store.createWriterInTmp(currentDate, + maxKeyCount, hcd.getCompactionCompression(), region.getStartKey()); + mobFilePath = mobWriter.getPath(); + + mobWriter.append(key1); + mobWriter.append(key2); + mobWriter.append(key3); + mobWriter.close(); + + long valueLength1 = key1.getValueLength(); + long valueLength2 = key2.getValueLength(); + long valueLength3 = key3.getValueLength(); + + String targetPathName = MobUtils.formatDate(currentDate); + byte[] referenceValue = + Bytes.toBytes(targetPathName + Path.SEPARATOR + + mobFilePath.getName()); + byte[] newReferenceValue1 = Bytes.add(Bytes.toBytes(valueLength1), referenceValue); + byte[] newReferenceValue2 = Bytes.add(Bytes.toBytes(valueLength2), referenceValue); + byte[] newReferenceValue3 = Bytes.add(Bytes.toBytes(valueLength3), referenceValue); + seekKey1 = new KeyValue(row, family, qf1, Long.MAX_VALUE, newReferenceValue1); + seekKey2 = new KeyValue(row, family, qf2, Long.MAX_VALUE, newReferenceValue2); + seekKey3 = new KeyValue(row2, family, qf3, Long.MAX_VALUE, newReferenceValue3); + } + + /** + * Getting data from memstore + * @throws IOException + */ + @Test + public void testGetFromMemStore() throws IOException { + final Configuration conf = HBaseConfiguration.create(); + init(name.getMethodName(), conf, false); + + //Put data in memstore + this.store.add(new KeyValue(row, family, qf1, 1, value)); + this.store.add(new KeyValue(row, family, qf2, 1, value)); + this.store.add(new KeyValue(row, family, qf3, 1, value)); + this.store.add(new KeyValue(row, family, qf4, 1, value)); + this.store.add(new KeyValue(row, family, qf5, 1, value)); + this.store.add(new KeyValue(row, family, qf6, 1, value)); + + Scan scan = new Scan(get); + InternalScanner scanner = (InternalScanner) store.getScanner(scan, + scan.getFamilyMap().get(store.getFamily().getName()), + 0); + + List results = new ArrayList(); + scanner.next(results); + Collections.sort(results, KeyValue.COMPARATOR); + scanner.close(); + + //Compare + Assert.assertEquals(expected.size(), results.size()); + for(int i=0; i results = new ArrayList(); + scanner.next(results); + Collections.sort(results, KeyValue.COMPARATOR); + scanner.close(); + + //Compare + Assert.assertEquals(expected.size(), results.size()); + for(int i=0; i results = new ArrayList(); + scanner.next(results); + Collections.sort(results, KeyValue.COMPARATOR); + scanner.close(); + + //Compare + Assert.assertEquals(expected.size(), results.size()); + for(int i=0; i results = new ArrayList(); + scanner.next(results); + Collections.sort(results, KeyValue.COMPARATOR); + scanner.close(); + + //Compare + Assert.assertEquals(expected.size(), results.size()); + for(int i=0; i results = new ArrayList(); + scanner.next(results); + Collections.sort(results, KeyValue.COMPARATOR); + scanner.close(); + + //Compare + Assert.assertEquals(expected.size(), results.size()); + for(int i=0; i cells = res.listCells(); + for(Cell cell : cells) { + // Verify the value + Assert.assertEquals(Bytes.toString(value), + Bytes.toString(CellUtil.cloneValue(cell))); + count++; + } + } + results.close(); + Assert.assertEquals(3, count); + } + + public void testGetFromMemStore(boolean reversed) throws Exception { + String TN = "testGetFromMemStore" + reversed; + setUp(defaultThreshold, TN); + long ts1 = System.currentTimeMillis(); + long ts2 = ts1 + 1; + long ts3 = ts1 + 2; + byte [] value = generateMobValue((int)defaultThreshold+1);; + + Put put1 = new Put(row1); + put1.add(family, qf1, ts3, value); + put1.add(family, qf2, ts2, value); + put1.add(family, qf3, ts1, value); + table.put(put1); + + Scan scan = new Scan(); + setScan(scan, reversed, false); + + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + List cells = res.listCells(); + for(Cell cell : cells) { + // Verify the value + Assert.assertEquals(Bytes.toString(value), + Bytes.toString(CellUtil.cloneValue(cell))); + count++; + } + } + results.close(); + Assert.assertEquals(3, count); + } + + public void testGetReferences(boolean reversed) throws Exception { + String TN = "testGetReferences" + reversed; + setUp(defaultThreshold, TN); + long ts1 = System.currentTimeMillis(); + long ts2 = ts1 + 1; + long ts3 = ts1 + 2; + byte [] value = generateMobValue((int)defaultThreshold+1);; + + Put put1 = new Put(row1); + put1.add(family, qf1, ts3, value); + put1.add(family, qf2, ts2, value); + put1.add(family, qf3, ts1, value); + table.put(put1); + + table.flushCommits(); + admin.flush(TN); + + Scan scan = new Scan(); + setScan(scan, reversed, true); + + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + List cells = res.listCells(); + for(Cell cell : cells) { + // Verify the value + assertIsMobReference(cell, row1, family, value, TN); + count++; + } + } + results.close(); + Assert.assertEquals(3, count); + } + + public void testMobThreshold(boolean reversed) throws Exception { + String TN = "testMobThreshold" + reversed; + setUp(defaultThreshold, TN); + byte [] valueLess = generateMobValue((int)defaultThreshold-1); + byte [] valueEqual = generateMobValue((int)defaultThreshold); + byte [] valueGreater = generateMobValue((int)defaultThreshold+1); + long ts1 = System.currentTimeMillis(); + long ts2 = ts1 + 1; + long ts3 = ts1 + 2; + + Put put1 = new Put(row1); + put1.add(family, qf1, ts3, valueLess); + put1.add(family, qf2, ts2, valueEqual); + put1.add(family, qf3, ts1, valueGreater); + table.put(put1); + + table.flushCommits(); + admin.flush(TN); + + Scan scan = new Scan(); + setScan(scan, reversed, true); + + Cell cellLess= null; + Cell cellEqual = null; + Cell cellGreater = null; + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + List cells = res.listCells(); + for(Cell cell : cells) { + // Verify the value + String qf = Bytes.toString(CellUtil.cloneQualifier(cell)); + if(qf.equals(Bytes.toString(qf1))) { + cellLess = cell; + } + if(qf.equals(Bytes.toString(qf2))) { + cellEqual = cell; + } + if(qf.equals(Bytes.toString(qf3))) { + cellGreater = cell; + } + count++; + } + } + Assert.assertEquals(3, count); + assertNotMobReference(cellLess, row1, family, valueLess); + assertNotMobReference(cellEqual, row1, family, valueEqual); + assertIsMobReference(cellGreater, row1, family, valueGreater, TN); + results.close(); + } + + /** + * Assert the value is not store in mob. + */ + private static void assertNotMobReference(Cell cell, byte[] row, byte[] family, + byte[] value) throws IOException { + Assert.assertEquals(Bytes.toString(row), + Bytes.toString(CellUtil.cloneRow(cell))); + Assert.assertEquals(Bytes.toString(family), + Bytes.toString(CellUtil.cloneFamily(cell))); + Assert.assertTrue(Bytes.toString(value).equals( + Bytes.toString(CellUtil.cloneValue(cell)))); + } + + /** + * Assert the value is store in mob. + */ + private static void assertIsMobReference(Cell cell, byte[] row, byte[] family, + byte[] value, String TN) throws IOException { + Assert.assertEquals(Bytes.toString(row), + Bytes.toString(CellUtil.cloneRow(cell))); + Assert.assertEquals(Bytes.toString(family), + Bytes.toString(CellUtil.cloneFamily(cell))); + Assert.assertFalse(Bytes.toString(value).equals( + Bytes.toString(CellUtil.cloneValue(cell)))); + byte[] referenceValue = CellUtil.cloneValue(cell); + String fileName = Bytes.toString(referenceValue, 8, referenceValue.length-8); + Path mobFamilyPath; + mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), + TableName.valueOf(TN)), hcd.getNameAsString()); + Path targetPath = new Path(mobFamilyPath, fileName); + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + Assert.assertTrue(fs.exists(targetPath)); + } +}