HBASE-11861 Native MOB Compaction mechanisms (Jingcheng Du)

This commit is contained in:
Jonathan M Hsieh 2015-02-06 05:37:13 -08:00
parent fbbb3249d9
commit 2c4934eda6
14 changed files with 2426 additions and 14 deletions

View File

@ -1517,4 +1517,55 @@ possible configurations would overwhelm and obscure the important.
The default value is one day.
</description>
</property>
<property>
<name>hbase.mob.file.compaction.mergeable.threshold</name>
<value>201326592</value>
<description>
If the size of a mob file is less than this value, it's regarded as a small
file and needs to be merged in mob file compaction. The default value is 192MB.
</description>
</property>
<property>
<name>hbase.mob.delfile.max.count</name>
<value>3</value>
<description>
The max number of del files that is allowed in the mob file compaction.
In the mob file compaction, when the number of existing del files is larger than
this value, they are merged until number of del files is not larger this value.
The default value is 3.
</description>
</property>
<property>
<name>hbase.mob.file.compaction.batch.size</name>
<value>100</value>
<description>
The max number of the mob files that is allowed in a batch of the mob file compaction.
The mob file compaction merges the small mob files to bigger ones. If the number of the
small files is very large, it could lead to a "too many opened file handlers" in the merge.
And the merge has to be split into batches. This value limits the number of mob files
that are selected in a batch of the mob file compaction. The default value is 100.
</description>
</property>
<property>
<name>hbase.master.mob.file.compaction.chore.period</name>
<value>604800000</value>
<description>
The period that MobFileCompactionChore runs. The unit is millisecond.
The default value is one week.
</description>
</property>
<property>
<name>hbase.mob.file.compactor.class</name>
<value>org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor</value>
<description>
Implementation of mob file compactor, the default one is PartitionedMobFileCompactor.
</description>
</property>
<property>
<name>hbase.master.mob.file.compaction.chore.threads.max</name>
<value>1</value>
<description>
The max number of threads used in MobFileCompactionChore.
</description>
</property>
</configuration>

View File

@ -209,6 +209,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
private MobFileCompactionChore mobFileCompactChore;
MasterCoprocessorHost cpHost;
@ -613,6 +614,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
Threads.setDaemonThreadRunning(expiredMobFileCleanerChore.getThread());
this.mobFileCompactChore = new MobFileCompactionChore(this);
Threads.setDaemonThreadRunning(mobFileCompactChore.getThread());
if (this.cpHost != null) {
// don't let cp initialization errors kill the master
@ -863,6 +866,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
if (this.expiredMobFileCleanerChore != null) {
this.expiredMobFileCleanerChore.interrupt();
}
if (this.mobFileCompactChore != null) {
this.mobFileCompactChore.interrupt();
}
if (this.balancerChore != null) {
this.balancerChore.interrupt();
}

View File

@ -0,0 +1,162 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor;
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
/**
* The Class MobFileCompactChore for running compaction regularly to merge small mob files.
*/
@InterfaceAudience.Private
public class MobFileCompactionChore extends Chore{
private static final Log LOG = LogFactory.getLog(MobFileCompactionChore.class);
private HMaster master;
private TableLockManager tableLockManager;
private ExecutorService pool;
public MobFileCompactionChore(HMaster master) {
super(master.getServerName() + "-MobFileCompactChore", master.getConfiguration().getInt(
MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD), master);
this.master = master;
this.tableLockManager = master.getTableLockManager();
this.pool = createThreadPool();
}
@Override
protected void chore() {
try {
String className = master.getConfiguration().get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY,
PartitionedMobFileCompactor.class.getName());
TableDescriptors htds = master.getTableDescriptors();
Map<String, HTableDescriptor> map = htds.getAll();
for (HTableDescriptor htd : map.values()) {
for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
if (!hcd.isMobEnabled()) {
continue;
}
// instantiate the mob file compactor.
MobFileCompactor compactor = null;
try {
compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class,
ExecutorService.class },
new Object[] { master.getConfiguration(), master.getFileSystem(), htd.getTableName(),
hcd, pool });
} catch (Exception e) {
throw new IOException("Unable to load configured mob file compactor '" + className
+ "'", e);
}
// compact only for mob-enabled column.
// obtain a write table lock before performing compaction to avoid race condition
// with major compaction in mob-enabled column.
boolean tableLocked = false;
TableLock lock = null;
try {
// the tableLockManager might be null in testing. In that case, it is lock-free.
if (tableLockManager != null) {
lock = tableLockManager.writeLock(MobUtils.getTableLockName(htd.getTableName()),
"Run MobFileCompactChore");
lock.acquire();
}
tableLocked = true;
compactor.compact();
} catch (Exception e) {
LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString()
+ " in the table " + htd.getNameAsString(), e);
} finally {
if (lock != null && tableLocked) {
try {
lock.release();
} catch (IOException e) {
LOG.error(
"Fail to release the write lock for the table " + htd.getNameAsString(), e);
}
}
}
}
}
} catch (Exception e) {
LOG.error("Fail to clean the expired mob files", e);
}
}
@Override
protected void cleanup() {
super.cleanup();
pool.shutdown();
}
/**
* Creates a thread pool.
* @return A thread pool.
*/
private ExecutorService createThreadPool() {
Configuration conf = master.getConfiguration();
int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_MAX,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX);
if (maxThreads == 0) {
maxThreads = 1;
}
long keepAliveTime = conf.getLong(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME);
final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// waiting for a thread to pick up instead of throwing exceptions.
queue.put(r);
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
}
}
});
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
return pool;
}
}

View File

@ -72,8 +72,51 @@ public class MobConstants {
public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
public final static String TEMP_DIR_NAME = ".tmp";
public final static String BULKLOAD_DIR_NAME = ".bulkload";
public final static byte[] MOB_TABLE_LOCK_SUFFIX = Bytes.toBytes(".mobLock");
public final static String EMPTY_STRING = "";
/**
* If the size of a mob file is less than this value, it's regarded as a small file and needs to
* be merged in mob file compaction. The default value is 192MB.
*/
public static final String MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD =
"hbase.mob.file.compaction.mergeable.threshold";
public static final long DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD = 192 * 1024 * 1024;
/**
* The max number of del files that is allowed in the mob file compaction. In the mob file
* compaction, when the number of existing del files is larger than this value, they are merged
* until number of del files is not larger this value. The default value is 3.
*/
public static final String MOB_DELFILE_MAX_COUNT = "hbase.mob.delfile.max.count";
public static final int DEFAULT_MOB_DELFILE_MAX_COUNT = 3;
/**
* The max number of the mob files that is allowed in a batch of the mob file compaction.
* The mob file compaction merges the small mob files to bigger ones. If the number of the
* small files is very large, it could lead to a "too many opened file handlers" in the merge.
* And the merge has to be split into batches. This value limits the number of mob files
* that are selected in a batch of the mob file compaction. The default value is 100.
*/
public static final String MOB_FILE_COMPACTION_BATCH_SIZE =
"hbase.mob.file.compaction.batch.size";
public static final int DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE = 100;
/**
* The period that MobFileCompactionChore runs. The unit is millisecond.
* The default value is one week.
*/
public static final String MOB_FILE_COMPACTION_CHORE_PERIOD =
"hbase.master.mob.file.compaction.chore.period";
public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD =
24 * 60 * 60 * 1000 * 7; // a week
public static final String MOB_FILE_COMPACTOR_CLASS_KEY = "hbase.mob.file.compactor.class";
/**
* The max number of threads used in MobFileCompactionChore.
*/
public static final String MOB_FILE_COMPACTION_CHORE_THREADS_MAX =
"hbase.master.mob.file.compaction.chore.threads.max";
public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX = 1;
public static final String MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME =
"hbase.master.mob.file.compaction.chore.threads.keepalivetime";
public static final long DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME = 60;
private MobConstants() {
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -416,7 +417,7 @@ public class MobUtils {
}
/**
* Creates a directory of mob files for flushing.
* Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
@ -435,17 +436,110 @@ public class MobUtils {
throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
.replaceAll("-", ""));
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig);
}
/**
* Creates a writer for the ref file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param cacheConfig The current cache config.
* @return The writer for the mob file.
* @throws IOException
*/
public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig)
throws IOException {
HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true)
.withIncludesTags(true).withCompression(family.getCompactionCompression())
.withCompressTags(family.shouldCompressTags()).withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", ""));
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath)
.withComparator(KeyValue.COMPARATOR).withBloomType(family.getBloomFilterType())
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
return w;
}
/**
* Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param date The date string, its format is yyyymmmdd.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @param cacheConfig The current cache config.
* @return The writer for the mob file.
* @throws IOException
*/
public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig)
throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
.replaceAll("-", ""));
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig);
}
/**
* Creates a writer for the del file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param date The date string, its format is yyyymmmdd.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @param cacheConfig The current cache config.
* @return The writer for the del file.
* @throws IOException
*/
public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig)
throws IOException {
String suffix = UUID
.randomUUID().toString().replaceAll("-", "") + "_del";
MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig);
}
/**
* Creates a writer for the del file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param mobFileName The mob file name.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param cacheConfig The current cache config.
* @return The writer for the mob file.
* @throws IOException
*/
private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
Compression.Algorithm compression, CacheConfig cacheConfig) throws IOException {
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
.withIncludesMvcc(false).withIncludesTags(true)
.withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
.withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
.withDataBlockEncoding(family.getDataBlockEncoding()).build();
.withIncludesMvcc(false).withIncludesTags(true).withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withBlockSize(family.getBlocksize())
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
.withFilePath(new Path(basePath, mobFileName.getFileName()))
.withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
.withFilePath(new Path(basePath, mobFileName.getFileName()))
.withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
return w;
}
@ -456,12 +550,13 @@ public class MobUtils {
* @param path The path where the mob file is saved.
* @param targetPath The directory path where the source file is renamed to.
* @param cacheConfig The current cache config.
* @return The target file path the source file is renamed to.
* @throws IOException
*/
public static void commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
Path targetPath, CacheConfig cacheConfig) throws IOException {
if (sourceFile == null) {
return;
return null;
}
Path dstPath = new Path(targetPath, sourceFile.getName());
validateMobFile(conf, fs, sourceFile, cacheConfig);
@ -474,6 +569,7 @@ public class MobUtils {
if (!fs.rename(sourceFile, dstPath)) {
throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
}
return dstPath;
}
/**

View File

@ -0,0 +1,64 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.filecompactions;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* The compaction request for mob files.
*/
@InterfaceAudience.Private
public abstract class MobFileCompactionRequest {
protected long selectionTime;
protected CompactionType type = CompactionType.PART_FILES;
public void setCompactionType(CompactionType type) {
this.type = type;
}
/**
* Gets the selection time.
* @return The selection time.
*/
public long getSelectionTime() {
return this.selectionTime;
}
/**
* Gets the compaction type.
* @return The compaction type.
*/
public CompactionType getCompactionType() {
return type;
}
protected enum CompactionType {
/**
* Part of mob files are selected.
*/
PART_FILES,
/**
* All of mob files are selected.
*/
ALL_FILES;
}
}

View File

@ -0,0 +1,78 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.filecompactions;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.FSUtils;
/**
* A mob file compactor to directly compact the mob files.
*/
@InterfaceAudience.Private
public abstract class MobFileCompactor {
protected FileSystem fs;
protected Configuration conf;
protected TableName tableName;
protected HColumnDescriptor column;
protected Path mobTableDir;
protected Path mobFamilyDir;
protected ExecutorService pool;
public MobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
HColumnDescriptor column, ExecutorService pool) {
this.conf = conf;
this.fs = fs;
this.tableName = tableName;
this.column = column;
this.pool = pool;
mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
mobFamilyDir = MobUtils.getMobFamilyPath(conf, tableName, column.getNameAsString());
}
/**
* Compacts the mob files for the current column family.
* @return The paths of new mob files generated in the compaction.
* @throws IOException
*/
public List<Path> compact() throws IOException {
return compact(Arrays.asList(fs.listStatus(mobFamilyDir)));
}
/**
* Compacts the candidate mob files.
* @param files The candidate mob files.
* @return The paths of new mob files generated in the compaction.
* @throws IOException
*/
public abstract List<Path> compact(List<FileStatus> files) throws IOException;
}

View File

@ -0,0 +1,146 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.filecompactions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* An implementation of {@link MobFileCompactionRequest} that is used in
* {@link PartitionedMobFileCompactor}.
* The mob files that have the same start key and date in their names belong to
* the same partition.
*/
@InterfaceAudience.Private
public class PartitionedMobFileCompactionRequest extends MobFileCompactionRequest {
protected Collection<FileStatus> delFiles;
protected Collection<CompactionPartition> compactionPartitions;
public PartitionedMobFileCompactionRequest(Collection<CompactionPartition> compactionPartitions,
Collection<FileStatus> delFiles) {
this.selectionTime = EnvironmentEdgeManager.currentTime();
this.compactionPartitions = compactionPartitions;
this.delFiles = delFiles;
}
/**
* Gets the compaction partitions.
* @return The compaction partitions.
*/
public Collection<CompactionPartition> getCompactionPartitions() {
return this.compactionPartitions;
}
/**
* Gets the del files.
* @return The del files.
*/
public Collection<FileStatus> getDelFiles() {
return this.delFiles;
}
/**
* The partition in the mob file compaction.
* The mob files that have the same start key and date in their names belong to
* the same partition.
*/
protected static class CompactionPartition {
private List<FileStatus> files = new ArrayList<FileStatus>();
private CompactionPartitionId partitionId;
public CompactionPartition(CompactionPartitionId partitionId) {
this.partitionId = partitionId;
}
public CompactionPartitionId getPartitionId() {
return this.partitionId;
}
public void addFile(FileStatus file) {
files.add(file);
}
public List<FileStatus> listFiles() {
return Collections.unmodifiableList(files);
}
}
/**
* The partition id that consists of start key and date of the mob file name.
*/
protected static class CompactionPartitionId {
private String startKey;
private String date;
public CompactionPartitionId(String startKey, String date) {
if (startKey == null || date == null) {
throw new IllegalArgumentException("Neither of start key and date could be null");
}
this.startKey = startKey;
this.date = date;
}
public String getStartKey() {
return this.startKey;
}
public String getDate() {
return this.date;
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + startKey.hashCode();
result = 31 * result + date.hashCode();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof CompactionPartitionId)) {
return false;
}
CompactionPartitionId another = (CompactionPartitionId) obj;
if (!this.startKey.equals(another.startKey)) {
return false;
}
if (!this.date.equals(another.date)) {
return false;
}
return true;
}
@Override
public String toString() {
return new StringBuilder(startKey).append(date).toString();
}
}
}

View File

@ -0,0 +1,631 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.filecompactions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
/**
* An implementation of {@link MobFileCompactor} that compacts the mob files in partitions.
*/
@InterfaceAudience.Private
public class PartitionedMobFileCompactor extends MobFileCompactor {
private static final Log LOG = LogFactory.getLog(PartitionedMobFileCompactor.class);
protected long mergeableSize;
protected int delFileMaxCount;
/** The number of files compacted in a batch */
protected int compactionBatchSize;
protected int compactionKVMax;
private Path tempPath;
private Path bulkloadPath;
private CacheConfig compactionCacheConfig;
private Tag tableNameTag;
public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
HColumnDescriptor column, ExecutorService pool) {
super(conf, fs, tableName, column, pool);
mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
// default is 100
compactionBatchSize = conf.getInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME,
tableName.getNameAsString()));
compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
HConstants.COMPACTION_KV_MAX_DEFAULT);
Configuration copyOfConf = new Configuration(conf);
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
compactionCacheConfig = new CacheConfig(copyOfConf);
tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
}
@Override
public List<Path> compact(List<FileStatus> files) throws IOException {
if (files == null || files.isEmpty()) {
return null;
}
// find the files to compact.
PartitionedMobFileCompactionRequest request = select(files);
// compact the files.
return performCompaction(request);
}
/**
* Selects the compacted mob/del files.
* Iterates the candidates to find out all the del files and small mob files.
* @param candidates All the candidates.
* @return A compaction request.
* @throws IOException
*/
protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates)
throws IOException {
Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
Map<CompactionPartitionId, CompactionPartition> filesToCompact =
new HashMap<CompactionPartitionId, CompactionPartition>();
int selectedFileCount = 0;
int irrelevantFileCount = 0;
for (FileStatus file : candidates) {
if (!file.isFile()) {
irrelevantFileCount++;
continue;
}
// group the del files and small files.
FileStatus linkedFile = file;
if (HFileLink.isHFileLink(file.getPath())) {
HFileLink link = new HFileLink(conf, file.getPath());
linkedFile = getLinkedFileStatus(link);
if (linkedFile == null) {
// If the linked file cannot be found, regard it as an irrelevantFileCount file
irrelevantFileCount++;
continue;
}
}
if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
allDelFiles.add(file);
} else if (linkedFile.getLen() < mergeableSize) {
// add the small files to the merge pool
MobFileName fileName = MobFileName.create(linkedFile.getPath().getName());
CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(),
fileName.getDate());
CompactionPartition compactionPartition = filesToCompact.get(id);
if (compactionPartition == null) {
compactionPartition = new CompactionPartition(id);
compactionPartition.addFile(file);
filesToCompact.put(id, compactionPartition);
} else {
compactionPartition.addFile(file);
}
selectedFileCount++;
}
}
PartitionedMobFileCompactionRequest request = new PartitionedMobFileCompactionRequest(
filesToCompact.values(), allDelFiles);
if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
// all the files are selected
request.setCompactionType(CompactionType.ALL_FILES);
}
return request;
}
/**
* Performs the compaction on the selected files.
* <ol>
* <li>Compacts the del files.</li>
* <li>Compacts the selected small mob files and all the del files.</li>
* <li>If all the candidates are selected, delete the del files.</li>
* </ol>
* @param request The compaction request.
* @return The paths of new mob files generated in the compaction.
* @throws IOException
*/
protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
throws IOException {
// merge the del files
List<Path> delFilePaths = new ArrayList<Path>();
for (FileStatus delFile : request.delFiles) {
delFilePaths.add(delFile.getPath());
}
List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
for (Path newDelPath : newDelPaths) {
StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
newDelFiles.add(sf);
}
// compact the mob files by partitions.
List<Path> paths = compactMobFiles(request, newDelFiles);
// archive the del files if all the mob files are selected.
if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
try {
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
} catch (IOException e) {
LOG.error("Failed to archive the del files " + newDelFiles, e);
}
}
return paths;
}
/**
* Compacts the selected small mob files and all the del files.
* @param request The compaction request.
* @param delFiles The del files.
* @return The paths of new mob files after compactions.
* @throws IOException
*/
protected List<Path> compactMobFiles(final PartitionedMobFileCompactionRequest request,
final List<StoreFile> delFiles) throws IOException {
Collection<CompactionPartition> partitions = request.compactionPartitions;
if (partitions == null || partitions.isEmpty()) {
return Collections.emptyList();
}
List<Path> paths = new ArrayList<Path>();
final HTable table = new HTable(conf, tableName);
try {
Map<CompactionPartitionId, Future<List<Path>>> results =
new HashMap<CompactionPartitionId, Future<List<Path>>>();
// compact the mob files by partitions in parallel.
for (final CompactionPartition partition : partitions) {
results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
@Override
public List<Path> call() throws Exception {
return compactMobFilePartition(request, partition, delFiles, table);
}
}));
}
// compact the partitions in parallel.
boolean hasFailure = false;
for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
try {
paths.addAll(result.getValue().get());
} catch (Exception e) {
// just log the error
LOG.error("Failed to compact the partition " + result.getKey(), e);
hasFailure = true;
}
}
if (hasFailure) {
// if any partition fails in the compaction, directly throw an exception.
throw new IOException("Failed to compact the partitions");
}
} finally {
try {
table.close();
} catch (IOException e) {
LOG.error("Failed to close the HTable", e);
}
}
return paths;
}
/**
* Compacts a partition of selected small mob files and all the del files.
* @param request The compaction request.
* @param partition A compaction partition.
* @param delFiles The del files.
* @param table The current table.
* @return The paths of new mob files after compactions.
* @throws IOException
*/
private List<Path> compactMobFilePartition(PartitionedMobFileCompactionRequest request,
CompactionPartition partition, List<StoreFile> delFiles, HTable table) throws IOException {
List<Path> newFiles = new ArrayList<Path>();
List<FileStatus> files = partition.listFiles();
int offset = 0;
Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
while (offset < files.size()) {
int batch = compactionBatchSize;
if (files.size() - offset < compactionBatchSize) {
batch = files.size() - offset;
}
if (batch == 1 && delFiles.isEmpty()) {
// only one file left and no del files, do not compact it,
// and directly add it to the new files.
newFiles.add(files.get(offset).getPath());
offset++;
continue;
}
// clean the bulkload directory to avoid loading old files.
fs.delete(bulkloadPathOfPartition, true);
// add the selected mob files and del files into filesToCompact
List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
for (int i = offset; i < batch + offset; i++) {
StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
BloomType.NONE);
filesToCompact.add(sf);
}
filesToCompact.addAll(delFiles);
// compact the mob files in a batch.
compactMobFilesInBatch(request, partition, table, filesToCompact, batch,
bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
// move to the next batch.
offset += batch;
}
return newFiles;
}
/**
* Compacts a partition of selected small mob files and all the del files in a batch.
* @param request The compaction request.
* @param partition A compaction partition.
* @param table The current table.
* @param filesToCompact The files to be compacted.
* @param batch The number of mob files to be compacted in a batch.
* @param bulkloadPathOfPartition The directory where the bulkload column of the current
* partition is saved.
* @param bulkloadColumnPath The directory where the bulkload files of current partition
* are saved.
* @param newFiles The paths of new mob files after compactions.
* @throws IOException
*/
private void compactMobFilesInBatch(PartitionedMobFileCompactionRequest request,
CompactionPartition partition, HTable table, List<StoreFile> filesToCompact, int batch,
Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
throws IOException {
// open scanner to the selected mob files and del files.
StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
// the mob files to be compacted, not include the del files.
List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
// Pair(maxSeqId, cellsCount)
Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
// open writers for the mob files and new ref store files.
Writer writer = null;
Writer refFileWriter = null;
Path filePath = null;
Path refFilePath = null;
long mobCells = 0;
try {
writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId()
.getStartKey(), compactionCacheConfig);
filePath = writer.getPath();
byte[] fileName = Bytes.toBytes(filePath.getName());
// create a temp file and open a writer for it in the bulkloadPath
refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
.getSecond().longValue(), compactionCacheConfig);
refFilePath = refFileWriter.getPath();
List<Cell> cells = new ArrayList<Cell>();
boolean hasMore = false;
do {
hasMore = scanner.next(cells, compactionKVMax);
for (Cell cell : cells) {
// TODO remove this after the new code are introduced.
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
// write the mob cell to the mob file.
writer.append(kv);
// write the new reference cell to the store file.
KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
refFileWriter.append(reference);
mobCells++;
}
cells.clear();
} while (hasMore);
} finally {
// close the scanner.
scanner.close();
// append metadata to the mob file, and close the mob file writer.
closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
// append metadata and bulkload info to the ref mob file, and close the writer.
closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
}
if (mobCells > 0) {
// commit mob file
MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
// bulkload the ref file
bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName());
newFiles.add(new Path(mobFamilyDir, filePath.getName()));
} else {
// remove the new files
// the mob file is empty, delete it instead of committing.
deletePath(filePath);
// the ref file is empty, delete it instead of committing.
deletePath(refFilePath);
}
// archive the old mob files, do not archive the del files.
try {
MobUtils
.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
} catch (IOException e) {
LOG.error("Failed to archive the files " + mobFilesToCompact, e);
}
}
/**
* Compacts the del files in batches which avoids opening too many files.
* @param request The compaction request.
* @param delFilePaths
* @return The paths of new del files after merging or the original files if no merging
* is necessary.
* @throws IOException
*/
protected List<Path> compactDelFiles(PartitionedMobFileCompactionRequest request,
List<Path> delFilePaths) throws IOException {
if (delFilePaths.size() <= delFileMaxCount) {
return delFilePaths;
}
// when there are more del files than the number that is allowed, merge it firstly.
int offset = 0;
List<Path> paths = new ArrayList<Path>();
while (offset < delFilePaths.size()) {
// get the batch
int batch = compactionBatchSize;
if (delFilePaths.size() - offset < compactionBatchSize) {
batch = delFilePaths.size() - offset;
}
List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>();
if (batch == 1) {
// only one file left, do not compact it, directly add it to the new files.
paths.add(delFilePaths.get(offset));
offset++;
continue;
}
for (int i = offset; i < batch + offset; i++) {
batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
BloomType.NONE));
}
// compact the del files in a batch.
paths.add(compactDelFilesInBatch(request, batchedDelFiles));
// move to the next batch.
offset += batch;
}
return compactDelFiles(request, paths);
}
/**
* Compacts the del file in a batch.
* @param request The compaction request.
* @param delFiles The del files.
* @return The path of new del file after merging.
* @throws IOException
*/
private Path compactDelFilesInBatch(PartitionedMobFileCompactionRequest request,
List<StoreFile> delFiles) throws IOException {
// create a scanner for the del files.
StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
Writer writer = null;
Path filePath = null;
try {
writer = MobUtils.createDelFileWriter(conf, fs, column,
MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig);
filePath = writer.getPath();
List<Cell> cells = new ArrayList<Cell>();
boolean hasMore = false;
do {
hasMore = scanner.next(cells, compactionKVMax);
for (Cell cell : cells) {
// TODO remove this after the new code are introduced.
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
writer.append(kv);
}
cells.clear();
} while (hasMore);
} finally {
scanner.close();
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
LOG.error("Failed to close the writer of the file " + filePath, e);
}
}
}
// commit the new del file
Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
// archive the old del files
try {
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
} catch (IOException e) {
LOG.error("Failed to archive the old del files " + delFiles, e);
}
return path;
}
/**
* Creates a store scanner.
* @param filesToCompact The files to be compacted.
* @param scanType The scan type.
* @return The store scanner.
* @throws IOException
*/
private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
throws IOException {
List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
null, HConstants.LATEST_TIMESTAMP);
Scan scan = new Scan();
scan.setMaxVersions(column.getMaxVersions());
long ttl = HStore.determineTTLFromFamily(column);
ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR);
StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
HConstants.LATEST_TIMESTAMP);
return scanner;
}
/**
* Bulkloads the current file.
* @param table The current table.
* @param bulkloadDirectory The path of bulkload directory.
* @param fileName The current file name.
* @throws IOException
*/
private void bulkloadRefFile(HTable table, Path bulkloadDirectory, String fileName)
throws IOException {
// bulkload the ref file
try {
LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
bulkload.doBulkLoad(bulkloadDirectory, table);
} catch (Exception e) {
// delete the committed mob file
deletePath(new Path(mobFamilyDir, fileName));
throw new IOException(e);
} finally {
// delete the bulkload files in bulkloadPath
deletePath(bulkloadDirectory);
}
}
/**
* Closes the mob file writer.
* @param writer The mob file writer.
* @param maxSeqId Maximum sequence id.
* @param mobCellsCount The number of mob cells.
* @throws IOException
*/
private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount)
throws IOException {
if (writer != null) {
writer.appendMetadata(maxSeqId, false, mobCellsCount);
try {
writer.close();
} catch (IOException e) {
LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
}
}
}
/**
* Closes the ref file writer.
* @param writer The ref file writer.
* @param maxSeqId Maximum sequence id.
* @param bulkloadTime The timestamp at which the bulk load file is created.
* @throws IOException
*/
private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime)
throws IOException {
if (writer != null) {
writer.appendMetadata(maxSeqId, false);
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
try {
writer.close();
} catch (IOException e) {
LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
}
}
}
/**
* Gets the max seqId and number of cells of the store files.
* @param storeFiles The store files.
* @return The pair of the max seqId and number of cells of the store files.
* @throws IOException
*/
private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException {
long maxSeqId = 0;
long maxKeyCount = 0;
for (StoreFile sf : storeFiles) {
// the readers will be closed later after the merge.
maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
if (count != null) {
maxKeyCount += Bytes.toLong(count);
}
}
return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
}
/**
* Deletes a file.
* @param path The path of the file to be deleted.
*/
private void deletePath(Path path) {
try {
if (path != null) {
fs.delete(path, true);
}
} catch (IOException e) {
LOG.error("Failed to delete the file " + path, e);
}
}
private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
Path[] locations = link.getLocations();
for (Path location : locations) {
FileStatus file = getFileStatus(location);
if (file != null) {
return file;
}
}
return null;
}
private FileStatus getFileStatus(Path path) throws IOException {
try {
if (path != null) {
FileStatus file = fs.getFileStatus(path);
return file;
}
} catch (FileNotFoundException e) {
LOG.warn("The file " + path + " can not be found", e);
}
return null;
}
}

View File

@ -363,7 +363,7 @@ public class HStore implements Store {
* @param family
* @return TTL in seconds of the specified family
*/
static long determineTTLFromFamily(final HColumnDescriptor family) {
public static long determineTTLFromFamily(final HColumnDescriptor family) {
// HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
long ttl = family.getTimeToLive();
if (ttl == HConstants.FOREVER) {

View File

@ -272,8 +272,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// 0 is passed as readpoint because the test bypasses Store
0);
}
StoreScanner(final Scan scan, ScanInfo scanInfo,
public StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
throws IOException {

View File

@ -0,0 +1,652 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.filecompactions;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestMobFileCompactor {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private Configuration conf = null;
private String tableNameAsString;
private TableName tableName;
private static HTable hTable;
private static Admin admin;
private static HTableDescriptor desc;
private static HColumnDescriptor hcd1;
private static HColumnDescriptor hcd2;
private static FileSystem fs;
private final static String family1 = "family1";
private final static String family2 = "family2";
private final static String qf1 = "qualifier1";
private final static String qf2 = "qualifier2";
private static byte[] KEYS = Bytes.toBytes("012");
private static int regionNum = KEYS.length;
private static int delRowNum = 1;
private static int delCellNum = 6;
private static int cellNumPerRow = 3;
private static int rowNumPerFile = 2;
private static ExecutorService pool;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
TEST_UTIL.startMiniCluster(1);
pool = createThreadPool(TEST_UTIL.getConfiguration());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
pool.shutdown();
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws Exception {
fs = TEST_UTIL.getTestFileSystem();
conf = TEST_UTIL.getConfiguration();
long tid = System.currentTimeMillis();
tableNameAsString = "testMob" + tid;
tableName = TableName.valueOf(tableNameAsString);
hcd1 = new HColumnDescriptor(family1);
hcd1.setMobEnabled(true);
hcd1.setMobThreshold(0L);
hcd1.setMaxVersions(4);
hcd2 = new HColumnDescriptor(family2);
hcd2.setMobEnabled(true);
hcd2.setMobThreshold(0L);
hcd2.setMaxVersions(4);
desc = new HTableDescriptor(tableName);
desc.addFamily(hcd1);
desc.addFamily(hcd2);
admin = TEST_UTIL.getHBaseAdmin();
admin.createTable(desc, getSplitKeys());
hTable = new HTable(conf, tableNameAsString);
hTable.setAutoFlush(false, false);
}
@After
public void tearDown() throws Exception {
admin.disableTable(tableName);
admin.deleteTable(tableName);
admin.close();
hTable.close();
fs.delete(TEST_UTIL.getDataTestDir(), true);
}
@Test
public void testCompactionWithoutDelFiles() throws Exception {
resetConf();
int count = 4;
// generate mob files
loadData(count, rowNumPerFile);
int rowNumPerRegion = count*rowNumPerFile;
assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion,
countMobRows(hTable));
assertEquals("Before compaction: mob file count", regionNum*count, countFiles(true, family1));
assertEquals("Before compaction: del file count", 0, countFiles(false, family1));
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
compactor.compact();
assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
countMobRows(hTable));
assertEquals("After compaction: mob file count", regionNum, countFiles(true, family1));
assertEquals("After compaction: del file count", 0, countFiles(false, family1));
}
@Test
public void testCompactionWithDelFiles() throws Exception {
resetConf();
int count = 4;
// generate mob files
loadData(count, rowNumPerFile);
int rowNumPerRegion = count*rowNumPerFile;
assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
countMobRows(hTable));
assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
countMobCells(hTable));
assertEquals("Before deleting: family1 mob file count", regionNum*count,
countFiles(true, family1));
assertEquals("Before deleting: family2 mob file count", regionNum*count,
countFiles(true, family2));
createDelFile();
assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("Before compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
assertEquals("Before compaction: family1 mob file count", regionNum*count,
countFiles(true, family1));
assertEquals("Before compaction: family2 file count", regionNum*count,
countFiles(true, family2));
assertEquals("Before compaction: family1 del file count", regionNum,
countFiles(false, family1));
assertEquals("Before compaction: family2 del file count", regionNum,
countFiles(false, family2));
// do the mob file compaction
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
compactor.compact();
assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("After compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
assertEquals("After compaction: family1 mob file count", regionNum,
countFiles(true, family1));
assertEquals("After compaction: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1));
assertEquals("After compaction: family2 del file count", regionNum,
countFiles(false, family2));
assertRefFileNameEqual(family1);
}
private void assertRefFileNameEqual(String familyName) throws IOException {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(familyName));
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
ResultScanner results = hTable.getScanner(scan);
Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
tableName), familyName);
List<Path> actualFilePaths = new ArrayList<>();
List<Path> expectFilePaths = new ArrayList<>();
for (Result res : results) {
for (Cell cell : res.listCells()) {
byte[] referenceValue = CellUtil.cloneValue(cell);
String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
referenceValue.length - Bytes.SIZEOF_INT);
Path targetPath = new Path(mobFamilyPath, fileName);
if(!actualFilePaths.contains(targetPath)) {
actualFilePaths.add(targetPath);
}
}
}
results.close();
if (fs.exists(mobFamilyPath)) {
FileStatus[] files = fs.listStatus(mobFamilyPath);
for (FileStatus file : files) {
if (!StoreFileInfo.isDelFile(file.getPath())) {
expectFilePaths.add(file.getPath());
}
}
}
Collections.sort(actualFilePaths);
Collections.sort(expectFilePaths);
assertEquals(expectFilePaths, actualFilePaths);
}
@Test
public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception {
resetConf();
int mergeSize = 5000;
// change the mob compaction merge size
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
int count = 4;
// generate mob files
loadData(count, rowNumPerFile);
int rowNumPerRegion = count*rowNumPerFile;
assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
countMobRows(hTable));
assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
countMobCells(hTable));
assertEquals("Before deleting: mob file count", regionNum*count, countFiles(true, family1));
int largeFilesCount = countLargeFiles(mergeSize, family1);
createDelFile();
assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("Before compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
assertEquals("Before compaction: family1 mob file count", regionNum*count,
countFiles(true, family1));
assertEquals("Before compaction: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("Before compaction: family1 del file count", regionNum,
countFiles(false, family1));
assertEquals("Before compaction: family2 del file count", regionNum,
countFiles(false, family2));
// do the mob file compaction
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
compactor.compact();
assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("After compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
// After the compaction, the files smaller than the mob compaction merge size
// is merge to one file
assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum,
countFiles(true, family1));
assertEquals("After compaction: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("After compaction: family1 del file count", regionNum,
countFiles(false, family1));
assertEquals("After compaction: family2 del file count", regionNum,
countFiles(false, family2));
}
@Test
public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() throws Exception {
resetConf();
int batchSize = 2;
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, batchSize);
int count = 4;
// generate mob files
loadData(count, rowNumPerFile);
int rowNumPerRegion = count*rowNumPerFile;
assertEquals("Before deleting: mob row count", regionNum*rowNumPerRegion,
countMobRows(hTable));
assertEquals("Before deleting: family1 mob file count", regionNum*count,
countFiles(true, family1));
assertEquals("Before deleting: family2 mob file count", regionNum*count,
countFiles(true, family2));
createDelFile();
assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("Before compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
assertEquals("Before compaction: family1 mob file count", regionNum*count,
countFiles(true, family1));
assertEquals("Before compaction: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("Before compaction: family1 del file count", regionNum,
countFiles(false, family1));
assertEquals("Before compaction: family2 del file count", regionNum,
countFiles(false, family2));
// do the mob file compaction
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
compactor.compact();
assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("After compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
assertEquals("After compaction: family1 mob file count", regionNum*(count/batchSize),
countFiles(true, family1));
assertEquals("After compaction: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1));
assertEquals("After compaction: family2 del file count", regionNum,
countFiles(false, family2));
}
@Test
public void testCompactionWithHFileLink() throws IOException, InterruptedException {
resetConf();
int count = 4;
// generate mob files
loadData(count, rowNumPerFile);
int rowNumPerRegion = count*rowNumPerFile;
long tid = System.currentTimeMillis();
byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid);
// take a snapshot
admin.snapshot(snapshotName1, tableName);
createDelFile();
assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("Before compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
assertEquals("Before compaction: family1 mob file count", regionNum*count,
countFiles(true, family1));
assertEquals("Before compaction: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("Before compaction: family1 del file count", regionNum,
countFiles(false, family1));
assertEquals("Before compaction: family2 del file count", regionNum,
countFiles(false, family2));
// do the mob file compaction
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
compactor.compact();
assertEquals("After first compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("After first compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
assertEquals("After first compaction: family1 mob file count", regionNum,
countFiles(true, family1));
assertEquals("After first compaction: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("After first compaction: family1 del file count", 0, countFiles(false, family1));
assertEquals("After first compaction: family2 del file count", regionNum,
countFiles(false, family2));
assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1));
assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2));
admin.disableTable(tableName);
// Restore from snapshot, the hfilelink will exist in mob dir
admin.restoreSnapshot(snapshotName1);
admin.enableTable(tableName);
assertEquals("After restoring snapshot: mob rows count", regionNum*rowNumPerRegion,
countMobRows(hTable));
assertEquals("After restoring snapshot: mob cells count",
regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
assertEquals("After restoring snapshot: family1 mob file count", regionNum*count,
countFiles(true, family1));
assertEquals("After restoring snapshot: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("After restoring snapshot: family1 del file count", 0,
countFiles(false, family1));
assertEquals("After restoring snapshot: family2 del file count", 0,
countFiles(false, family2));
assertEquals("After restoring snapshot: family1 hfilelink count", regionNum*count,
countHFileLinks(family1));
assertEquals("After restoring snapshot: family2 hfilelink count", 0,
countHFileLinks(family2));
compactor.compact();
assertEquals("After second compaction: mob rows count", regionNum*rowNumPerRegion,
countMobRows(hTable));
assertEquals("After second compaction: mob cells count",
regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
assertEquals("After second compaction: family1 mob file count", regionNum,
countFiles(true, family1));
assertEquals("After second compaction: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("After second compaction: family1 del file count", 0, countFiles(false, family1));
assertEquals("After second compaction: family2 del file count", 0, countFiles(false, family2));
assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1));
assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2));
}
/**
* Gets the number of rows in the given table.
* @param table to get the scanner
* @return the number of rows
*/
private int countMobRows(final HTable table) throws IOException {
Scan scan = new Scan();
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
count++;
}
results.close();
return count;
}
/**
* Gets the number of cells in the given table.
* @param table to get the scanner
* @return the number of cells
*/
private int countMobCells(final HTable table) throws IOException {
Scan scan = new Scan();
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
for (Cell cell : res.listCells()) {
count++;
}
}
results.close();
return count;
}
/**
* Gets the number of files in the mob path.
* @param isMobFile gets number of the mob files or del files
* @param familyName the family name
* @return the number of the files
*/
private int countFiles(boolean isMobFile, String familyName) throws IOException {
Path mobDirPath = MobUtils.getMobFamilyPath(
MobUtils.getMobRegionPath(conf, tableName), familyName);
int count = 0;
if (fs.exists(mobDirPath)) {
FileStatus[] files = fs.listStatus(mobDirPath);
for (FileStatus file : files) {
if (isMobFile == true) {
if (!StoreFileInfo.isDelFile(file.getPath())) {
count++;
}
} else {
if (StoreFileInfo.isDelFile(file.getPath())) {
count++;
}
}
}
}
return count;
}
/**
* Gets the number of HFileLink in the mob path.
* @param familyName the family name
* @return the number of the HFileLink
*/
private int countHFileLinks(String familyName) throws IOException {
Path mobDirPath = MobUtils.getMobFamilyPath(
MobUtils.getMobRegionPath(conf, tableName), familyName);
int count = 0;
if (fs.exists(mobDirPath)) {
FileStatus[] files = fs.listStatus(mobDirPath);
for (FileStatus file : files) {
if (HFileLink.isHFileLink(file.getPath())) {
count++;
}
}
}
return count;
}
/**
* Gets the number of files.
* @param size the size of the file
* @param familyName the family name
* @return the number of files large than the size
*/
private int countLargeFiles(int size, String familyName) throws IOException {
Path mobDirPath = MobUtils.getMobFamilyPath(
MobUtils.getMobRegionPath(conf, tableName), familyName);
int count = 0;
if (fs.exists(mobDirPath)) {
FileStatus[] files = fs.listStatus(mobDirPath);
for (FileStatus file : files) {
// ignore the del files in the mob path
if ((!StoreFileInfo.isDelFile(file.getPath()))
&& (file.getLen() > size)) {
count++;
}
}
}
return count;
}
/**
* loads some data to the table.
* @param count the mob file number
*/
private void loadData(int fileNum, int rowNumPerFile) throws IOException,
InterruptedException {
if (fileNum <= 0) {
throw new IllegalArgumentException();
}
for (byte k0 : KEYS) {
byte[] k = new byte[] { k0 };
for (int i = 0; i < fileNum * rowNumPerFile; i++) {
byte[] key = Bytes.add(k, Bytes.toBytes(i));
byte[] mobVal = makeDummyData(10 * (i + 1));
Put put = new Put(key);
put.setDurability(Durability.SKIP_WAL);
put.add(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
put.add(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
put.add(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
hTable.put(put);
if ((i + 1) % rowNumPerFile == 0) {
hTable.flushCommits();
admin.flush(tableName);
}
}
}
}
/**
* delete the row, family and cell to create the del file
*/
private void createDelFile() throws IOException, InterruptedException {
for (byte k0 : KEYS) {
byte[] k = new byte[] { k0 };
// delete a family
byte[] key1 = Bytes.add(k, Bytes.toBytes(0));
Delete delete1 = new Delete(key1);
delete1.deleteFamily(Bytes.toBytes(family1));
hTable.delete(delete1);
// delete one row
byte[] key2 = Bytes.add(k, Bytes.toBytes(2));
Delete delete2 = new Delete(key2);
hTable.delete(delete2);
// delete one cell
byte[] key3 = Bytes.add(k, Bytes.toBytes(4));
Delete delete3 = new Delete(key3);
delete3.deleteColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1));
hTable.delete(delete3);
hTable.flushCommits();
admin.flush(tableName);
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(
Bytes.toBytes(tableNameAsString));
for (HRegion region : regions) {
region.waitForFlushesAndCompactions();
region.compactStores(true);
}
}
}
/**
* Creates the dummy data with a specific size.
* @param the size of data
* @return the dummy data
*/
private byte[] makeDummyData(int size) {
byte[] dummyData = new byte[size];
new Random().nextBytes(dummyData);
return dummyData;
}
/**
* Gets the split keys
*/
public static byte[][] getSplitKeys() {
byte[][] splitKeys = new byte[KEYS.length - 1][];
for (int i = 0; i < splitKeys.length; ++i) {
splitKeys[i] = new byte[] { KEYS[i + 1] };
}
return splitKeys;
}
private static ExecutorService createThreadPool(Configuration conf) {
int maxThreads = 10;
long keepAliveTime = 60;
final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
keepAliveTime, TimeUnit.SECONDS, queue,
Threads.newDaemonThreadFactory("MobFileCompactionChore"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// waiting for a thread to pick up instead of throwing exceptions.
queue.put(r);
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
}
}
});
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
return pool;
}
/**
* Resets the configuration.
*/
private void resetConf() {
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
}
}

View File

@ -0,0 +1,60 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.filecompactions;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestPartitionedMobFileCompactionRequest {
@Test
public void testCompactedPartitionId() {
String startKey1 = "startKey1";
String startKey2 = "startKey2";
String date1 = "date1";
String date2 = "date2";
CompactionPartitionId partitionId1 = new CompactionPartitionId(startKey1, date1);
CompactionPartitionId partitionId2 = new CompactionPartitionId(startKey2, date2);
CompactionPartitionId partitionId3 = new CompactionPartitionId(startKey1, date2);
Assert.assertTrue(partitionId1.equals(partitionId1));
Assert.assertFalse(partitionId1.equals(partitionId2));
Assert.assertFalse(partitionId1.equals(partitionId3));
Assert.assertFalse(partitionId2.equals(partitionId3));
Assert.assertEquals(startKey1, partitionId1.getStartKey());
Assert.assertEquals(date1, partitionId1.getDate());
}
@Test
public void testCompactedPartition() {
CompactionPartitionId partitionId = new CompactionPartitionId("startKey1", "date1");
CompactionPartition partition = new CompactionPartition(partitionId);
FileStatus file = new FileStatus(1, false, 1, 1024, 1, new Path("/test"));
partition.addFile(file);
Assert.assertEquals(file, partition.listFiles().get(0));
}
}

View File

@ -0,0 +1,423 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.filecompactions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestPartitionedMobFileCompactor {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static String family = "family";
private final static String qf = "qf";
private HColumnDescriptor hcd = new HColumnDescriptor(family);
private Configuration conf = TEST_UTIL.getConfiguration();
private CacheConfig cacheConf = new CacheConfig(conf);
private FileSystem fs;
private List<FileStatus> mobFiles = new ArrayList<>();
private List<FileStatus> delFiles = new ArrayList<>();
private List<FileStatus> allFiles = new ArrayList<>();
private Path basePath;
private String mobSuffix;
private String delSuffix;
private static ExecutorService pool;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
TEST_UTIL.startMiniCluster(1);
pool = createThreadPool(TEST_UTIL.getConfiguration());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
pool.shutdown();
TEST_UTIL.shutdownMiniCluster();
}
private void init(String tableName) throws Exception {
fs = FileSystem.get(conf);
Path testDir = FSUtils.getRootDir(conf);
Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
basePath = new Path(new Path(mobTestDir, tableName), family);
mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
}
@Test
public void testCompactionSelectWithAllFiles() throws Exception {
resetConf();
String tableName = "testCompactionSelectWithAllFiles";
init(tableName);
int count = 10;
// create 10 mob files.
createStoreFiles(basePath, family, qf, count, Type.Put);
// create 10 del files
createStoreFiles(basePath, family, qf, count, Type.Delete);
listFiles();
long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD;
List<String> expectedStartKeys = new ArrayList<>();
for(FileStatus file : mobFiles) {
if(file.getLen() < mergeSize) {
String fileName = file.getPath().getName();
String startKey = fileName.substring(0, 32);
expectedStartKeys.add(startKey);
}
}
testSelectFiles(tableName, CompactionType.ALL_FILES, expectedStartKeys);
}
@Test
public void testCompactionSelectWithPartFiles() throws Exception {
resetConf();
String tableName = "testCompactionSelectWithPartFiles";
init(tableName);
int count = 10;
// create 10 mob files.
createStoreFiles(basePath, family, qf, count, Type.Put);
// create 10 del files
createStoreFiles(basePath, family, qf, count, Type.Delete);
listFiles();
long mergeSize = 4000;
List<String> expectedStartKeys = new ArrayList<>();
for(FileStatus file : mobFiles) {
if(file.getLen() < 4000) {
String fileName = file.getPath().getName();
String startKey = fileName.substring(0, 32);
expectedStartKeys.add(startKey);
}
}
// set the mob file compaction mergeable threshold
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
testSelectFiles(tableName, CompactionType.PART_FILES, expectedStartKeys);
}
@Test
public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
resetConf();
String tableName = "testCompactDelFilesWithDefaultBatchSize";
init(tableName);
// create 20 mob files.
createStoreFiles(basePath, family, qf, 20, Type.Put);
// create 13 del files
createStoreFiles(basePath, family, qf, 13, Type.Delete);
listFiles();
testCompactDelFiles(tableName, 1, 13);
}
@Test
public void testCompactDelFilesWithSmallBatchSize() throws Exception {
resetConf();
String tableName = "testCompactDelFilesWithSmallBatchSize";
init(tableName);
// create 20 mob files.
createStoreFiles(basePath, family, qf, 20, Type.Put);
// create 13 del files
createStoreFiles(basePath, family, qf, 13, Type.Delete);
listFiles();
// set the mob file compaction batch size
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4);
testCompactDelFiles(tableName, 1, 13);
}
@Test
public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
resetConf();
String tableName = "testCompactDelFilesWithSmallBatchSize";
init(tableName);
// create 20 mob files.
createStoreFiles(basePath, family, qf, 20, Type.Put);
// create 13 del files
createStoreFiles(basePath, family, qf, 13, Type.Delete);
listFiles();
// set the max del file count
conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5);
// set the mob file compaction batch size
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2);
testCompactDelFiles(tableName, 4, 13);
}
/**
* Tests the selectFiles
* @param tableName the table name
* @param type the expected compaction type
* @param expected the expected start keys
*/
private void testSelectFiles(String tableName, final CompactionType type,
final List<String> expected) throws IOException {
PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
TableName.valueOf(tableName), hcd, pool) {
@Override
public List<Path> compact(List<FileStatus> files) throws IOException {
if (files == null || files.isEmpty()) {
return null;
}
PartitionedMobFileCompactionRequest request = select(files);
// assert the compaction type is ALL_FILES
Assert.assertEquals(type, request.type);
// assert get the right partitions
compareCompactedPartitions(expected, request.compactionPartitions);
// assert get the right del files
compareDelFiles(request.delFiles);
return null;
}
};
compactor.compact(allFiles);
}
/**
* Tests the compacteDelFile
* @param tableName the table name
* @param expectedFileCount the expected file count
* @param expectedCellCount the expected cell count
*/
private void testCompactDelFiles(String tableName, final int expectedFileCount,
final int expectedCellCount) throws IOException {
PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
TableName.valueOf(tableName), hcd, pool) {
@Override
protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
throws IOException {
List<Path> delFilePaths = new ArrayList<Path>();
for (FileStatus delFile : request.delFiles) {
delFilePaths.add(delFile.getPath());
}
List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
// assert the del files are merged.
Assert.assertEquals(expectedFileCount, newDelPaths.size());
Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
return null;
}
};
compactor.compact(allFiles);
}
/**
* Lists the files in the path
*/
private void listFiles() throws IOException {
for (FileStatus file : fs.listStatus(basePath)) {
allFiles.add(file);
if (file.getPath().getName().endsWith("_del")) {
delFiles.add(file);
} else {
mobFiles.add(file);
}
}
}
/**
* Compares the compacted partitions.
* @param partitions the collection of CompactedPartitions
*/
private void compareCompactedPartitions(List<String> expected,
Collection<CompactionPartition> partitions) {
List<String> actualKeys = new ArrayList<>();
for (CompactionPartition partition : partitions) {
actualKeys.add(partition.getPartitionId().getStartKey());
}
Collections.sort(expected);
Collections.sort(actualKeys);
Assert.assertEquals(expected.size(), actualKeys.size());
for (int i = 0; i < expected.size(); i++) {
Assert.assertEquals(expected.get(i), actualKeys.get(i));
}
}
/**
* Compares the del files.
* @param allDelFiles all the del files
*/
private void compareDelFiles(Collection<FileStatus> allDelFiles) {
int i = 0;
for (FileStatus file : allDelFiles) {
Assert.assertEquals(delFiles.get(i), file);
i++;
}
}
/**
* Creates store files.
* @param basePath the path to create file
* @family the family name
* @qualifier the column qualifier
* @count the store file number
* @type the key type
*/
private void createStoreFiles(Path basePath, String family, String qualifier, int count,
Type type) throws IOException {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
String startKey = "row_";
MobFileName mobFileName = null;
for (int i = 0; i < count; i++) {
byte[] startRow = Bytes.toBytes(startKey + i) ;
if(type.equals(Type.Delete)) {
mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
new Date()), delSuffix);
}
if(type.equals(Type.Put)){
mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
new Date()), mobSuffix);
}
StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs)
.withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
type, (i+1)*1000);
}
}
/**
* Writes data to store file.
* @param writer the store file writer
* @param row the row key
* @param family the family name
* @param qualifier the column qualifier
* @param type the key type
* @param size the size of value
*/
private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family,
byte[] qualifier, Type type, int size) throws IOException {
long now = System.currentTimeMillis();
try {
byte[] dummyData = new byte[size];
new Random().nextBytes(dummyData);
writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
} finally {
writer.close();
}
}
/**
* Gets the number of del cell in the del files
* @param paths the del file paths
* @return the cell size
*/
private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
List<StoreFile> sfs = new ArrayList<StoreFile>();
int size = 0;
for(Path path : paths) {
StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
sfs.add(sf);
}
List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
false, null, HConstants.LATEST_TIMESTAMP);
Scan scan = new Scan();
scan.setMaxVersions(hcd.getMaxVersions());
long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
long ttl = HStore.determineTTLFromFamily(hcd);
ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null,
scanners, 0L, HConstants.LATEST_TIMESTAMP);
List<Cell> results = new ArrayList<>();
boolean hasMore = true;
while (hasMore) {
hasMore = scanner.next(results);
size += results.size();
results.clear();
}
scanner.close();
return size;
}
private static ExecutorService createThreadPool(Configuration conf) {
int maxThreads = 10;
long keepAliveTime = 60;
final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// waiting for a thread to pick up instead of throwing exceptions.
queue.put(r);
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
}
}
});
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
return pool;
}
/**
* Resets the configuration.
*/
private void resetConf() {
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
}
}