HBASE-13012 Add shell commands to trigger the mob file compactor (Jingcheng Du and Jiajia Li)

This commit is contained in:
Jonathan M Hsieh 2015-03-04 23:47:05 -08:00
parent d55405a7ce
commit 47ed5cd7ed
17 changed files with 989 additions and 184 deletions

View File

@ -1362,4 +1362,56 @@ public interface Admin extends Abortable, Closeable {
* @throws IOException
*/
public int getMasterInfoPort() throws IOException;
/**
* Compact the mob files in all mob-enabled column families. Asynchronous operation.
*
* @param tableName table to compact
* @throws IOException
* @throws InterruptedException
*/
void compactMob(final TableName tableName) throws IOException,
InterruptedException;
/**
* Compact the mob files in a mob-enabled column family. Asynchronous operation.
*
* @param tableName table to compact
* @param columnFamily column family within a table
* @throws IOException if not a mob column family or if a remote or network exception occurs
* @throws InterruptedException
*/
void compactMob(final TableName tableName, final byte[] columnFamily) throws IOException,
InterruptedException;
/**
* Major compact the mob files in all mob-enabled column family. Asynchronous operation.
*
* @param tableName table to compact
* @throws IOException
* @throws InterruptedException
*/
void majorCompactMob(final TableName tableName) throws IOException,
InterruptedException;
/**
* Major compact the mob files in a mob-enabled column family. Asynchronous operation.
*
* @param tableName table to compact
* @param columnFamily column family within a table
* @throws IOException if not a mob column family or if a remote or network exception occurs
* @throws InterruptedException
*/
void majorCompactMob(final TableName tableName, final byte[] columnFamily) throws IOException,
InterruptedException;
/**
* Get the current compaction state of a table. It could be in a compaction, or none.
*
* @param tableName table to examine
* @return the current compaction state
* @throws IOException if a remote or network exception occurs
*/
AdminProtos.GetRegionInfoResponse.CompactionState getMobCompactionState(final TableName tableName)
throws IOException;
}

View File

@ -3819,4 +3819,103 @@ public class HBaseAdmin implements Admin {
}
});
}
/**
* {@inheritDoc}
*/
@Override
public void compactMob(final TableName tableName, final byte[] columnFamily)
throws IOException, InterruptedException {
checkTableNameNotNull(tableName);
checkFamilyNameNotNull(columnFamily);
validateMobColumnFamily(tableName, columnFamily);
compactMob(tableName, columnFamily, false);
}
/**
* {@inheritDoc}
*/
@Override
public void compactMob(final TableName tableName) throws IOException, InterruptedException {
checkTableNameNotNull(tableName);
compactMob(tableName, null, false);
}
/**
* {@inheritDoc}
*/
@Override
public void majorCompactMob(final TableName tableName, final byte[] columnFamily)
throws IOException, InterruptedException {
checkTableNameNotNull(tableName);
checkFamilyNameNotNull(columnFamily);
validateMobColumnFamily(tableName, columnFamily);
compactMob(tableName, columnFamily, true);
}
/**
* {@inheritDoc}
*/
@Override
public void majorCompactMob(final TableName tableName) throws IOException, InterruptedException {
checkTableNameNotNull(tableName);
compactMob(tableName, null, true);
}
@Override
public CompactionState getMobCompactionState(TableName tableName) throws IOException {
checkTableNameNotNull(tableName);
try {
ServerName master = getClusterStatus().getMaster();
HRegionInfo info = new HRegionInfo(tableName, Bytes.toBytes(".mob"),
HConstants.EMPTY_END_ROW, false, 0);
GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
info.getRegionName(), true);
GetRegionInfoResponse response = this.connection.getAdmin(master)
.getRegionInfo(null, request);
return response.getCompactionState();
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
/**
* Compacts the mob files in a mob-enabled column family. Asynchronous operation.
* @param tableName The table to compact.
* @param columnFamily The column family to compact. If it is null, all the mob-enabled
* column families in this table will be compacted.
* @param major Whether to select all the mob files in the compaction.
* @throws IOException
* @throws InterruptedException
*/
private void compactMob(final TableName tableName, final byte[] columnFamily, boolean major)
throws IOException, InterruptedException {
// get the mob region info, this is a dummy region.
HRegionInfo info = new HRegionInfo(tableName, Bytes.toBytes(".mob"), HConstants.EMPTY_END_ROW,
false, 0);
ServerName master = getClusterStatus().getMaster();
compact(master, info, major, columnFamily);
}
private void checkTableNameNotNull(TableName tableName) {
if (tableName == null) {
throw new IllegalArgumentException("TableName cannot be null");
}
}
private void checkFamilyNameNotNull(byte[] columnFamily) {
if (columnFamily == null) {
throw new IllegalArgumentException("The column family name cannot be null");
}
}
private void validateMobColumnFamily(TableName tableName, byte[] columnFamily)
throws IOException {
HTableDescriptor htd = getTableDescriptor(tableName);
HColumnDescriptor family = htd.getFamily(columnFamily);
if (family == null || !family.isMobEnabled()) {
throw new IllegalArgumentException("Column family " + columnFamily
+ " is not a mob column family");
}
}
}

View File

@ -1462,6 +1462,14 @@ possible configurations would overwhelm and obscure the important.
<name>hbase.http.staticuser.user</name>
<value>dr.stack</value>
</property>
<property>
<name>hbase.regionserver.handler.abort.on.error.percent</name>
<value>0.5</value>
<description>The percent of region server RPC threads failed to abort RS.
-1 Disable aborting; 0 Abort if even a single handler has died;
0.x Abort only when this percent of handlers have died;
1 Abort only all of the handers have died.</description>
</property>
<!-- Mob properties. -->
<property>
<name>hbase.mob.file.cache.size</name>
@ -1555,7 +1563,7 @@ possible configurations would overwhelm and obscure the important.
</description>
</property>
<property>
<name>hbase.master.mob.file.compaction.chore.period</name>
<name>hbase.mob.file.compaction.chore.period</name>
<value>604800000</value>
<description>
The period that MobFileCompactionChore runs. The unit is millisecond.
@ -1570,18 +1578,10 @@ possible configurations would overwhelm and obscure the important.
</description>
</property>
<property>
<name>hbase.master.mob.file.compaction.chore.threads.max</name>
<name>hbase.mob.file.compaction.threads.max</name>
<value>1</value>
<description>
The max number of threads used in MobFileCompactionChore.
The max number of threads used in MobFileCompactor.
</description>
</property>
<property>
<name>hbase.regionserver.handler.abort.on.error.percent</name>
<value>0.5</value>
<description>The percent of region server RPC threads failed to abort RS.
-1 Disable aborting; 0 Abort if even a single handler has died;
0.x Abort only when this percent of handlers have died;
1 Abort only all of the handers have died.</description>
</property>
</configuration>

View File

@ -36,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@ -110,6 +111,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
@ -128,6 +130,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HBaseFsckRepair;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
@ -278,6 +281,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
private HFileCleaner hfileCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
private MobFileCompactionChore mobFileCompactChore;
MasterMobFileCompactionThread mobFileCompactThread;
// used to synchronize the mobFileCompactionStates
private final IdLock mobFileCompactionLock = new IdLock();
// save the information of mob file compactions in tables.
// the key is table name, the value is the number of compactions in that table.
private Map<TableName, AtomicInteger> mobFileCompactionStates = Maps.newConcurrentMap();
MasterCoprocessorHost cpHost;
@ -774,6 +783,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
this.mobFileCompactChore = new MobFileCompactionChore(this);
getChoreService().scheduleChore(mobFileCompactChore);
this.mobFileCompactThread = new MasterMobFileCompactionThread(this);
if (this.cpHost != null) {
// don't let cp initialization errors kill the master
@ -1087,6 +1097,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
if (this.clusterStatusPublisherChore != null){
clusterStatusPublisherChore.cancel(true);
}
if (this.mobFileCompactThread != null) {
this.mobFileCompactThread.close();
}
}
/**
@ -2318,4 +2331,58 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
}
/**
* Gets the mob file compaction state for a specific table.
* Whether all the mob files are selected is known during the compaction execution, but
* the statistic is done just before compaction starts, it is hard to know the compaction
* type at that time, so the rough statistics are chosen for the mob file compaction. Only two
* compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
* @param tableName The current table name.
* @return If a given table is in mob file compaction now.
*/
public CompactionState getMobCompactionState(TableName tableName) {
AtomicInteger compactionsCount = mobFileCompactionStates.get(tableName);
if (compactionsCount != null && compactionsCount.get() != 0) {
return CompactionState.MAJOR_AND_MINOR;
}
return CompactionState.NONE;
}
public void reportMobFileCompactionStart(TableName tableName) throws IOException {
IdLock.Entry lockEntry = null;
try {
lockEntry = mobFileCompactionLock.getLockEntry(tableName.hashCode());
AtomicInteger compactionsCount = mobFileCompactionStates.get(tableName);
if (compactionsCount == null) {
compactionsCount = new AtomicInteger(0);
mobFileCompactionStates.put(tableName, compactionsCount);
}
compactionsCount.incrementAndGet();
} finally {
if (lockEntry != null) {
mobFileCompactionLock.releaseLockEntry(lockEntry);
}
}
}
public void reportMobFileCompactionEnd(TableName tableName) throws IOException {
IdLock.Entry lockEntry = null;
try {
lockEntry = mobFileCompactionLock.getLockEntry(tableName.hashCode());
AtomicInteger compactionsCount = mobFileCompactionStates.get(tableName);
if (compactionsCount != null) {
int count = compactionsCount.decrementAndGet();
// remove the entry if the count is 0.
if (count == 0) {
mobFileCompactionStates.remove(tableName);
}
}
} finally {
if (lockEntry != null) {
mobFileCompactionLock.releaseLockEntry(lockEntry);
}
}
}
}

View File

@ -0,0 +1,184 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* The mob file compaction thread used in {@link MasterRpcServices}
*/
@InterfaceAudience.Private
public class MasterMobFileCompactionThread {
static final Log LOG = LogFactory.getLog(MasterMobFileCompactionThread.class);
private final HMaster master;
private final Configuration conf;
private final ExecutorService mobFileCompactorPool;
private final ExecutorService masterMobPool;
public MasterMobFileCompactionThread(HMaster master) {
this.master = master;
this.conf = master.getConfiguration();
final String n = Thread.currentThread().getName();
// this pool is used to run the mob file compaction
this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(n + "-MasterMobFileCompaction-" + EnvironmentEdgeManager.currentTime());
return t;
}
});
((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
// this pool is used in the mob file compaction to compact the mob files by partitions
// in parallel
this.mobFileCompactorPool = MobUtils
.createMobFileCompactorThreadPool(master.getConfiguration());
}
/**
* Requests mob file compaction
* @param conf The Configuration
* @param fs The file system
* @param tableName The table the compact
* @param hcds The column descriptors
* @param tableLockManager The tableLock manager
* @param isForceAllFiles Whether add all mob files into the compaction.
*/
public void requestMobFileCompaction(Configuration conf, FileSystem fs, TableName tableName,
List<HColumnDescriptor> hcds, TableLockManager tableLockManager, boolean isForceAllFiles)
throws IOException {
master.reportMobFileCompactionStart(tableName);
try {
masterMobPool.execute(new CompactionRunner(fs, tableName, hcds, tableLockManager,
isForceAllFiles, mobFileCompactorPool));
} catch (RejectedExecutionException e) {
// in case the request is rejected by the pool
try {
master.reportMobFileCompactionEnd(tableName);
} catch (IOException e1) {
LOG.error("Failed to mark end of mob file compation", e1);
}
throw e;
}
if (LOG.isDebugEnabled()) {
LOG.debug("The mob file compaction is requested for the columns " + hcds + " of the table "
+ tableName.getNameAsString());
}
}
private class CompactionRunner implements Runnable {
private FileSystem fs;
private TableName tableName;
private List<HColumnDescriptor> hcds;
private TableLockManager tableLockManager;
private boolean isForceAllFiles;
private ExecutorService pool;
public CompactionRunner(FileSystem fs, TableName tableName, List<HColumnDescriptor> hcds,
TableLockManager tableLockManager, boolean isForceAllFiles, ExecutorService pool) {
super();
this.fs = fs;
this.tableName = tableName;
this.hcds = hcds;
this.tableLockManager = tableLockManager;
this.isForceAllFiles = isForceAllFiles;
this.pool = pool;
}
@Override
public void run() {
try {
for (HColumnDescriptor hcd : hcds) {
MobUtils.doMobFileCompaction(conf, fs, tableName, hcd, pool, tableLockManager,
isForceAllFiles);
}
} catch (IOException e) {
LOG.error("Failed to perform the mob file compaction", e);
} finally {
try {
master.reportMobFileCompactionEnd(tableName);
} catch (IOException e) {
LOG.error("Failed to mark end of mob file compation", e);
}
}
}
}
/**
* Only interrupt once it's done with a run through the work loop.
*/
private void interruptIfNecessary() {
mobFileCompactorPool.shutdown();
masterMobPool.shutdown();
}
/**
* Wait for all the threads finish.
*/
private void join() {
waitFor(mobFileCompactorPool, "Mob file Compaction Thread");
waitFor(masterMobPool, "Region Server Mob File Compaction Thread");
}
/**
* Closes the MasterMobFileCompactionThread.
*/
public void close() {
interruptIfNecessary();
join();
}
/**
* Wait for thread finish.
* @param t the thread to wait
* @param name the thread name.
*/
private void waitFor(ExecutorService t, String name) {
boolean done = false;
while (!done) {
try {
done = t.awaitTermination(60, TimeUnit.SECONDS);
LOG.info("Waiting for " + name + " to finish...");
if (!done) {
t.shutdownNow();
}
} catch (InterruptedException ie) {
LOG.warn("Interrupted waiting for " + name + " to finish...");
}
}
}
}

View File

@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -43,11 +44,16 @@ import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.*;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@ -1304,4 +1310,106 @@ public class MasterRpcServices extends RSRpcServices
}
return response.build();
}
/**
* Compact a region on the master.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public CompactRegionResponse compactRegion(final RpcController controller,
final CompactRegionRequest request) throws ServiceException {
try {
master.checkInitialized();
byte[] regionName = request.getRegion().getValue().toByteArray();
TableName tableName = HRegionInfo.getTable(regionName);
// if the region is a mob region, do the mob file compaction.
if (MobUtils.isMobRegionName(tableName, regionName)) {
return compactMob(request, tableName);
} else {
return super.compactRegion(controller, request);
}
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public GetRegionInfoResponse getRegionInfo(final RpcController controller,
final GetRegionInfoRequest request) throws ServiceException {
try {
master.checkInitialized();
byte[] regionName = request.getRegion().getValue().toByteArray();
TableName tableName = HRegionInfo.getTable(regionName);
if (MobUtils.isMobRegionName(tableName, regionName)) {
// a dummy region info contains the compaction state.
HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(tableName);
GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
builder.setRegionInfo(HRegionInfo.convert(mobRegionInfo));
if (request.hasCompactionState() && request.getCompactionState()) {
builder.setCompactionState(master.getMobCompactionState(tableName));
}
return builder.build();
} else {
return super.getRegionInfo(controller, request);
}
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Compacts the mob files in the current table.
* @param request the request.
* @param tableName the current table name.
* @return The response of the mob file compaction.
* @throws IOException
*/
private CompactRegionResponse compactMob(final CompactRegionRequest request,
TableName tableName) throws IOException {
boolean isForceAllFiles = false;
List<HColumnDescriptor> compactedColumns = new ArrayList<HColumnDescriptor>();
HColumnDescriptor[] hcds = master.getTableDescriptors().get(tableName).getColumnFamilies();
byte[] family = null;
if (request.hasFamily()) {
family = request.getFamily().toByteArray();
for (HColumnDescriptor hcd : hcds) {
if (Bytes.equals(family, hcd.getName())) {
if (!hcd.isMobEnabled()) {
LOG.error("Column family " + hcd.getName() + " is not a mob column family");
throw new DoNotRetryIOException("Column family " + hcd.getName()
+ " is not a mob column family");
}
compactedColumns.add(hcd);
}
}
} else {
for (HColumnDescriptor hcd : hcds) {
if (hcd.isMobEnabled()) {
compactedColumns.add(hcd);
}
}
}
if (compactedColumns.isEmpty()) {
LOG.error("No mob column families are assigned in the mob file compaction");
throw new DoNotRetryIOException(
"No mob column families are assigned in the mob file compaction");
}
if (request.hasMajor() && request.getMajor()) {
isForceAllFiles = true;
}
String familyLogMsg = (family != null) ? Bytes.toString(family) : "";
if (LOG.isTraceEnabled()) {
LOG.trace("User-triggered mob file compaction requested for table: "
+ tableName.getNameAsString() + " for column family: " + familyLogMsg);
}
master.mobFileCompactThread.requestMobFileCompaction(master.getConfiguration(),
master.getFileSystem(), tableName, compactedColumns,
master.getTableLockManager(), isForceAllFiles);
return CompactRegionResponse.newBuilder().build();
}
}

View File

@ -18,33 +18,18 @@
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor;
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
/**
* The Class MobFileCompactChore for running compaction regularly to merge small mob files.
@ -63,62 +48,31 @@ public class MobFileCompactionChore extends ScheduledChore {
MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD));
this.master = master;
this.tableLockManager = master.getTableLockManager();
this.pool = createThreadPool();
this.pool = MobUtils.createMobFileCompactorThreadPool(master.getConfiguration());
}
@Override
protected void chore() {
try {
String className = master.getConfiguration().get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY,
PartitionedMobFileCompactor.class.getName());
TableDescriptors htds = master.getTableDescriptors();
Map<String, HTableDescriptor> map = htds.getAll();
for (HTableDescriptor htd : map.values()) {
for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
if (!hcd.isMobEnabled()) {
continue;
}
// instantiate the mob file compactor.
MobFileCompactor compactor = null;
try {
compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class,
ExecutorService.class },
new Object[] { master.getConfiguration(), master.getFileSystem(), htd.getTableName(),
hcd, pool });
} catch (Exception e) {
throw new IOException("Unable to load configured mob file compactor '" + className
+ "'", e);
}
// compact only for mob-enabled column.
// obtain a write table lock before performing compaction to avoid race condition
// with major compaction in mob-enabled column.
boolean tableLocked = false;
TableLock lock = null;
try {
// the tableLockManager might be null in testing. In that case, it is lock-free.
if (tableLockManager != null) {
lock = tableLockManager.writeLock(MobUtils.getTableLockName(htd.getTableName()),
"Run MobFileCompactChore");
lock.acquire();
boolean reported = false;
try {
for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
if (!hcd.isMobEnabled()) {
continue;
}
tableLocked = true;
compactor.compact();
} catch (LockTimeoutException e) {
LOG.info("Fail to acquire the lock because of timeout, maybe a major compaction or an"
+ " ExpiredMobFileCleanerChore is running", e);
} catch (Exception e) {
LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString()
+ " in the table " + htd.getNameAsString(), e);
} finally {
if (lock != null && tableLocked) {
try {
lock.release();
} catch (IOException e) {
LOG.error(
"Fail to release the write lock for the table " + htd.getNameAsString(), e);
}
if (!reported) {
master.reportMobFileCompactionStart(htd.getTableName());
reported = true;
}
MobUtils.doMobFileCompaction(master.getConfiguration(), master.getFileSystem(),
htd.getTableName(), hcd, pool, tableLockManager, false);
}
} finally {
if (reported) {
master.reportMobFileCompactionEnd(htd.getTableName());
}
}
}
@ -132,35 +86,4 @@ public class MobFileCompactionChore extends ScheduledChore {
super.cleanup();
pool.shutdown();
}
/**
* Creates a thread pool.
* @return A thread pool.
*/
private ExecutorService createThreadPool() {
Configuration conf = master.getConfiguration();
int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_MAX,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX);
if (maxThreads == 0) {
maxThreads = 1;
}
long keepAliveTime = conf.getLong(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME);
final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// waiting for a thread to pick up instead of throwing exceptions.
queue.put(r);
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
}
}
});
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
return pool;
}
}

View File

@ -104,19 +104,16 @@ public class MobConstants {
* The default value is one week.
*/
public static final String MOB_FILE_COMPACTION_CHORE_PERIOD =
"hbase.master.mob.file.compaction.chore.period";
"hbase.mob.file.compaction.chore.period";
public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD =
24 * 60 * 60 * 1000 * 7; // a week
public static final String MOB_FILE_COMPACTOR_CLASS_KEY = "hbase.mob.file.compactor.class";
/**
* The max number of threads used in MobFileCompactionChore.
* The max number of threads used in MobFileCompactor.
*/
public static final String MOB_FILE_COMPACTION_CHORE_THREADS_MAX =
"hbase.master.mob.file.compaction.chore.threads.max";
public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX = 1;
public static final String MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME =
"hbase.master.mob.file.compaction.chore.threads.keepalivetime";
public static final long DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME = 60;
public static final String MOB_FILE_COMPACTION_THREADS_MAX =
"hbase.mob.file.compaction.threads.max";
public static final int DEFAULT_MOB_FILE_COMPACTION_THREADS_MAX = 1;
private MobConstants() {
}

View File

@ -27,6 +27,12 @@ import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -51,11 +57,17 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor;
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
/**
* The mob utilities
@ -357,6 +369,16 @@ public class MobUtils {
.equals(regionInfo.getEncodedName());
}
/**
* Gets whether the current region name follows the pattern of a mob region name.
* @param tableName The current table name.
* @param regionName The current region name.
* @return True if the current region name follows the pattern of a mob region name.
*/
public static boolean isMobRegionName(TableName tableName, byte[] regionName) {
return Bytes.equals(regionName, getMobRegionInfo(tableName).getRegionName());
}
/**
* Gets the working directory of the mob compaction.
* @param root The root directory of the mob compaction.
@ -645,4 +667,85 @@ public class MobUtils {
byte[] tableName = tn.getName();
return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX));
}
/**
* Performs the mob file compaction.
* @param conf the Configuration
* @param fs the file system
* @param tableName the table the compact
* @param hcd the column descriptor
* @param pool the thread pool
* @param tableLockManager the tableLock manager
* @param isForceAllFiles Whether add all mob files into the compaction.
*/
public static void doMobFileCompaction(Configuration conf, FileSystem fs, TableName tableName,
HColumnDescriptor hcd, ExecutorService pool, TableLockManager tableLockManager,
boolean isForceAllFiles) throws IOException {
String className = conf.get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY,
PartitionedMobFileCompactor.class.getName());
// instantiate the mob file compactor.
MobFileCompactor compactor = null;
try {
compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class,
ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool });
} catch (Exception e) {
throw new IOException("Unable to load configured mob file compactor '" + className + "'", e);
}
// compact only for mob-enabled column.
// obtain a write table lock before performing compaction to avoid race condition
// with major compaction in mob-enabled column.
boolean tableLocked = false;
TableLock lock = null;
try {
// the tableLockManager might be null in testing. In that case, it is lock-free.
if (tableLockManager != null) {
lock = tableLockManager.writeLock(MobUtils.getTableLockName(tableName),
"Run MobFileCompaction");
lock.acquire();
}
tableLocked = true;
compactor.compact(isForceAllFiles);
} catch (Exception e) {
LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString()
+ " in the table " + tableName.getNameAsString(), e);
} finally {
if (lock != null && tableLocked) {
try {
lock.release();
} catch (IOException e) {
LOG.error("Fail to release the write lock for the table " + tableName.getNameAsString(),
e);
}
}
}
}
/**
* Creates a thread pool.
* @param conf the Configuration
* @return A thread pool.
*/
public static ExecutorService createMobFileCompactorThreadPool(Configuration conf) {
int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_THREADS_MAX,
MobConstants.DEFAULT_MOB_FILE_COMPACTION_THREADS_MAX);
if (maxThreads == 0) {
maxThreads = 1;
}
final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue,
Threads.newDaemonThreadFactory("MobFileCompactor"), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// waiting for a thread to pick up instead of throwing exceptions.
queue.put(r);
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
}
}
});
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
return pool;
}
}

View File

@ -65,14 +65,26 @@ public abstract class MobFileCompactor {
* @throws IOException
*/
public List<Path> compact() throws IOException {
return compact(Arrays.asList(fs.listStatus(mobFamilyDir)));
return compact(false);
}
/**
* Compacts the mob files by compaction type for the current column family.
* @param isForceAllFiles Whether add all mob files into the compaction.
* @return The paths of new mob files generated in the compaction.
* @throws IOException
*/
public List<Path> compact(boolean isForceAllFiles) throws IOException {
return compact(Arrays.asList(fs.listStatus(mobFamilyDir)), isForceAllFiles);
}
/**
* Compacts the candidate mob files.
* @param files The candidate mob files.
* @param isForceAllFiles Whether add all mob files into the compaction.
* @return The paths of new mob files generated in the compaction.
* @throws IOException
*/
public abstract List<Path> compact(List<FileStatus> files) throws IOException;
public abstract List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
throws IOException;
}

View File

@ -110,12 +110,14 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
}
@Override
public List<Path> compact(List<FileStatus> files) throws IOException {
public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) throws IOException {
if (files == null || files.isEmpty()) {
LOG.info("No candidate mob files");
return null;
}
LOG.info("isForceAllFiles: " + isForceAllFiles);
// find the files to compact.
PartitionedMobFileCompactionRequest request = select(files);
PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles);
// compact the files.
return performCompaction(request);
}
@ -124,11 +126,12 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
* Selects the compacted mob/del files.
* Iterates the candidates to find out all the del files and small mob files.
* @param candidates All the candidates.
* @param isForceAllFiles Whether add all mob files into the compaction.
* @return A compaction request.
* @throws IOException
*/
protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates)
throws IOException {
protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates,
boolean isForceAllFiles) throws IOException {
Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
Map<CompactionPartitionId, CompactionPartition> filesToCompact =
new HashMap<CompactionPartitionId, CompactionPartition>();
@ -152,8 +155,9 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
}
if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
allDelFiles.add(file);
} else if (linkedFile.getLen() < mergeableSize) {
// add the small files to the merge pool
} else if (isForceAllFiles || linkedFile.getLen() < mergeableSize) {
// add all files if isForceAllFiles is true,
// otherwise add the small files to the merge pool
MobFileName fileName = MobFileName.create(linkedFile.getPath().getName());
CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(),
fileName.getDate());
@ -174,6 +178,9 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
// all the files are selected
request.setCompactionType(CompactionType.ALL_FILES);
}
LOG.info("The compaction type is " + request.getCompactionType() + ", the request has "
+ allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and "
+ irrelevantFileCount + " irrelevant files");
return request;
}
@ -201,10 +208,14 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
newDelFiles.add(sf);
}
LOG.info("After merging, there are " + newDelFiles.size() + " del files");
// compact the mob files by partitions.
List<Path> paths = compactMobFiles(request, newDelFiles);
LOG.info("After compaction, there are " + paths.size() + " mob files");
// archive the del files if all the mob files are selected.
if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
LOG.info("After a mob file compaction with all files selected, archiving the del files "
+ newDelFiles);
try {
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
} catch (IOException e) {
@ -225,6 +236,7 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
final List<StoreFile> delFiles) throws IOException {
Collection<CompactionPartition> partitions = request.compactionPartitions;
if (partitions == null || partitions.isEmpty()) {
LOG.info("No partitions of mob files");
return Collections.emptyList();
}
List<Path> paths = new ArrayList<Path>();
@ -237,6 +249,7 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
@Override
public List<Path> call() throws Exception {
LOG.info("Compacting mob files for partition " + partition.getPartitionId());
return compactMobFilePartition(request, partition, delFiles, table);
}
}));
@ -310,6 +323,8 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
// move to the next batch.
offset += batch;
}
LOG.info("Compaction is finished. The number of mob files is changed from " + files.size()
+ " to " + newFiles.size());
return newFiles;
}

View File

@ -54,9 +54,11 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.AfterClass;
@ -71,28 +73,29 @@ public class TestMobFileCompactor {
private Configuration conf = null;
private String tableNameAsString;
private TableName tableName;
private static HTable hTable;
private static Admin admin;
private static HTableDescriptor desc;
private static HColumnDescriptor hcd1;
private static HColumnDescriptor hcd2;
private static FileSystem fs;
private final static String family1 = "family1";
private final static String family2 = "family2";
private final static String qf1 = "qualifier1";
private final static String qf2 = "qualifier2";
private static byte[] KEYS = Bytes.toBytes("012");
private static int regionNum = KEYS.length;
private static int delRowNum = 1;
private static int delCellNum = 6;
private static int cellNumPerRow = 3;
private static int rowNumPerFile = 2;
private HTable hTable;
private Admin admin;
private HTableDescriptor desc;
private HColumnDescriptor hcd1;
private HColumnDescriptor hcd2;
private FileSystem fs;
private final String family1 = "family1";
private final String family2 = "family2";
private final String qf1 = "qualifier1";
private final String qf2 = "qualifier2";
private byte[] KEYS = Bytes.toBytes("012");
private int regionNum = KEYS.length;
private int delRowNum = 1;
private int delCellNum = 6;
private int cellNumPerRow = 3;
private int rowNumPerFile = 2;
private static ExecutorService pool;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
TEST_UTIL.getConfiguration().setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, 5000);
TEST_UTIL.startMiniCluster(1);
pool = createThreadPool(TEST_UTIL.getConfiguration());
}
@ -208,41 +211,6 @@ public class TestMobFileCompactor {
assertRefFileNameEqual(family1);
}
private void assertRefFileNameEqual(String familyName) throws IOException {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(familyName));
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
ResultScanner results = hTable.getScanner(scan);
Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
tableName), familyName);
List<Path> actualFilePaths = new ArrayList<>();
List<Path> expectFilePaths = new ArrayList<>();
for (Result res : results) {
for (Cell cell : res.listCells()) {
byte[] referenceValue = CellUtil.cloneValue(cell);
String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
referenceValue.length - Bytes.SIZEOF_INT);
Path targetPath = new Path(mobFamilyPath, fileName);
if(!actualFilePaths.contains(targetPath)) {
actualFilePaths.add(targetPath);
}
}
}
results.close();
if (fs.exists(mobFamilyPath)) {
FileStatus[] files = fs.listStatus(mobFamilyPath);
for (FileStatus file : files) {
if (!StoreFileInfo.isDelFile(file.getPath())) {
expectFilePaths.add(file.getPath());
}
}
}
Collections.sort(actualFilePaths);
Collections.sort(expectFilePaths);
assertEquals(expectFilePaths, actualFilePaths);
}
@Test
public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception {
resetConf();
@ -428,6 +396,117 @@ public class TestMobFileCompactor {
assertEquals("After second compaction: family2 del file count", 0, countFiles(false, family2));
assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1));
assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2));
assertRefFileNameEqual(family1);
}
@Test
public void testCompactionFromAdmin() throws Exception {
int count = 4;
// generate mob files
loadData(count, rowNumPerFile);
int rowNumPerRegion = count*rowNumPerFile;
assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
countMobRows(hTable));
assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
countMobCells(hTable));
assertEquals("Before deleting: family1 mob file count", regionNum*count,
countFiles(true, family1));
assertEquals("Before deleting: family2 mob file count", regionNum*count,
countFiles(true, family2));
createDelFile();
assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("Before compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
assertEquals("Before compaction: family1 mob file count", regionNum*count,
countFiles(true, family1));
assertEquals("Before compaction: family2 file count", regionNum*count,
countFiles(true, family2));
assertEquals("Before compaction: family1 del file count", regionNum,
countFiles(false, family1));
assertEquals("Before compaction: family2 del file count", regionNum,
countFiles(false, family2));
int largeFilesCount = countLargeFiles(5000, family1);
// do the mob file compaction
admin.compactMob(tableName, hcd1.getName());
waitUntilCompactionFinished(tableName);
assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
countMobRows(hTable));
assertEquals("After compaction: mob cells count", regionNum
* (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(hTable));
assertEquals("After compaction: family1 mob file count", regionNum + largeFilesCount,
countFiles(true, family1));
assertEquals("After compaction: family2 mob file count", regionNum * count,
countFiles(true, family2));
assertEquals("After compaction: family1 del file count", regionNum,
countFiles(false, family1));
assertEquals("After compaction: family2 del file count", regionNum,
countFiles(false, family2));
assertRefFileNameEqual(family1);
}
@Test
public void testMajorCompactionFromAdmin() throws Exception {
int count = 4;
// generate mob files
loadData(count, rowNumPerFile);
int rowNumPerRegion = count*rowNumPerFile;
assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
countMobRows(hTable));
assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
countMobCells(hTable));
assertEquals("Before deleting: mob file count", regionNum*count, countFiles(true, family1));
createDelFile();
assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("Before compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
assertEquals("Before compaction: family1 mob file count", regionNum*count,
countFiles(true, family1));
assertEquals("Before compaction: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("Before compaction: family1 del file count", regionNum,
countFiles(false, family1));
assertEquals("Before compaction: family2 del file count", regionNum,
countFiles(false, family2));
// do the major mob file compaction, it will force all files to compaction
admin.majorCompactMob(tableName, hcd1.getName());
waitUntilCompactionFinished(tableName);
assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
countMobRows(hTable));
assertEquals("After compaction: mob cells count",
regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
assertEquals("After compaction: family1 mob file count", regionNum,
countFiles(true, family1));
assertEquals("After compaction: family2 mob file count", regionNum*count,
countFiles(true, family2));
assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1));
assertEquals("After compaction: family2 del file count", regionNum,
countFiles(false, family2));
}
private void waitUntilCompactionFinished(TableName tableName) throws IOException,
InterruptedException {
long finished = EnvironmentEdgeManager.currentTime() + 60000;
CompactionState state = admin.getMobCompactionState(tableName);
while (EnvironmentEdgeManager.currentTime() < finished) {
if (state == CompactionState.NONE) {
break;
}
state = admin.getMobCompactionState(tableName);
Thread.sleep(10);
}
assertEquals(CompactionState.NONE, state);
}
/**
@ -610,7 +689,7 @@ public class TestMobFileCompactor {
/**
* Gets the split keys
*/
public static byte[][] getSplitKeys() {
private byte[][] getSplitKeys() {
byte[][] splitKeys = new byte[KEYS.length - 1][];
for (int i = 0; i < splitKeys.length; ++i) {
splitKeys[i] = new byte[] { KEYS[i + 1] };
@ -640,6 +719,41 @@ public class TestMobFileCompactor {
return pool;
}
private void assertRefFileNameEqual(String familyName) throws IOException {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(familyName));
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
ResultScanner results = hTable.getScanner(scan);
Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
tableName), familyName);
List<Path> actualFilePaths = new ArrayList<>();
List<Path> expectFilePaths = new ArrayList<>();
for (Result res : results) {
for (Cell cell : res.listCells()) {
byte[] referenceValue = CellUtil.cloneValue(cell);
String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
referenceValue.length - Bytes.SIZEOF_INT);
Path targetPath = new Path(mobFamilyPath, fileName);
if(!actualFilePaths.contains(targetPath)) {
actualFilePaths.add(targetPath);
}
}
}
results.close();
if (fs.exists(mobFamilyPath)) {
FileStatus[] files = fs.listStatus(mobFamilyPath);
for (FileStatus file : files) {
if (!StoreFileInfo.isDelFile(file.getPath())) {
expectFilePaths.add(file.getPath());
}
}
}
Collections.sort(actualFilePaths);
Collections.sort(expectFilePaths);
assertEquals(expectFilePaths, actualFilePaths);
}
/**
* Resets the configuration.
*/

View File

@ -131,7 +131,7 @@ public class TestPartitionedMobFileCompactor {
expectedStartKeys.add(startKey);
}
}
testSelectFiles(tableName, CompactionType.ALL_FILES, expectedStartKeys);
testSelectFiles(tableName, CompactionType.ALL_FILES, false, expectedStartKeys);
}
@Test
@ -156,7 +156,30 @@ public class TestPartitionedMobFileCompactor {
}
// set the mob file compaction mergeable threshold
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
testSelectFiles(tableName, CompactionType.PART_FILES, expectedStartKeys);
testSelectFiles(tableName, CompactionType.PART_FILES, false, expectedStartKeys);
}
@Test
public void testCompactionSelectWithForceAllFiles() throws Exception {
resetConf();
String tableName = "testCompactionSelectWithForceAllFiles";
init(tableName);
int count = 10;
// create 10 mob files.
createStoreFiles(basePath, family, qf, count, Type.Put);
// create 10 del files
createStoreFiles(basePath, family, qf, count, Type.Delete);
listFiles();
long mergeSize = 4000;
List<String> expectedStartKeys = new ArrayList<>();
for(FileStatus file : mobFiles) {
String fileName = file.getPath().getName();
String startKey = fileName.substring(0, 32);
expectedStartKeys.add(startKey);
}
// set the mob file compaction mergeable threshold
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
testSelectFiles(tableName, CompactionType.ALL_FILES, true, expectedStartKeys);
}
@Test
@ -169,7 +192,7 @@ public class TestPartitionedMobFileCompactor {
// create 13 del files
createStoreFiles(basePath, family, qf, 13, Type.Delete);
listFiles();
testCompactDelFiles(tableName, 1, 13);
testCompactDelFiles(tableName, 1, 13, false);
}
@Test
@ -185,7 +208,7 @@ public class TestPartitionedMobFileCompactor {
// set the mob file compaction batch size
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4);
testCompactDelFiles(tableName, 1, 13);
testCompactDelFiles(tableName, 1, 13, false);
}
@Test
@ -203,7 +226,7 @@ public class TestPartitionedMobFileCompactor {
conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5);
// set the mob file compaction batch size
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2);
testCompactDelFiles(tableName, 4, 13);
testCompactDelFiles(tableName, 4, 13, false);
}
/**
@ -213,16 +236,17 @@ public class TestPartitionedMobFileCompactor {
* @param expected the expected start keys
*/
private void testSelectFiles(String tableName, final CompactionType type,
final List<String> expected) throws IOException {
final boolean isForceAllFiles, final List<String> expected) throws IOException {
PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
TableName.valueOf(tableName), hcd, pool) {
@Override
public List<Path> compact(List<FileStatus> files) throws IOException {
public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
throws IOException {
if (files == null || files.isEmpty()) {
return null;
}
PartitionedMobFileCompactionRequest request = select(files);
// assert the compaction type is ALL_FILES
PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles);
// assert the compaction type
Assert.assertEquals(type, request.type);
// assert get the right partitions
compareCompactedPartitions(expected, request.compactionPartitions);
@ -231,7 +255,7 @@ public class TestPartitionedMobFileCompactor {
return null;
}
};
compactor.compact(allFiles);
compactor.compact(allFiles, isForceAllFiles);
}
/**
@ -241,7 +265,7 @@ public class TestPartitionedMobFileCompactor {
* @param expectedCellCount the expected cell count
*/
private void testCompactDelFiles(String tableName, final int expectedFileCount,
final int expectedCellCount) throws IOException {
final int expectedCellCount, boolean isForceAllFiles) throws IOException {
PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
TableName.valueOf(tableName), hcd, pool) {
@Override
@ -258,8 +282,7 @@ public class TestPartitionedMobFileCompactor {
return null;
}
};
compactor.compact(allFiles);
compactor.compact(allFiles, isForceAllFiles);
}
/**

View File

@ -938,5 +938,27 @@ module Hbase
@admin.deleteNamespace(namespace_name)
end
#----------------------------------------------------------------------------------------------
# Requests a mob file compaction
def compact_mob(table_name, family = nil)
if family == nil
@admin.compactMob(org.apache.hadoop.hbase.TableName.valueOf(table_name))
else
# We are compacting a mob column family within a table.
@admin.compactMob(org.apache.hadoop.hbase.TableName.valueOf(table_name), family.to_java_bytes)
end
end
#----------------------------------------------------------------------------------------------
# Requests a mob file major compaction
def major_compact_mob(table_name, family = nil)
if family == nil
@admin.majorCompactMob(org.apache.hadoop.hbase.TableName.valueOf(table_name))
else
# We are major compacting a mob column family within a table.
@admin.majorCompactMob(org.apache.hadoop.hbase.TableName.valueOf(table_name), family.to_java_bytes)
end
end
end
end

View File

@ -327,6 +327,8 @@ Shell.load_command_group(
catalogjanitor_enabled
compact_rs
trace
compact_mob
major_compact_mob
],
# TODO remove older hlog_roll command
:aliases => {

View File

@ -0,0 +1,42 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class CompactMob < Command
def help
return <<-EOF
Run compaction on a mob enabled column family
or all mob enabled column families within a table
Examples:
Compact a column family within a table:
hbase> compact_mob 't1', 'c1'
Compact all mob enabled column families
hbase> compact_mob 't1'
EOF
end
def command(table_name, family = nil)
format_simple_command do
admin.compact_mob(table_name, family)
end
end
end
end
end

View File

@ -0,0 +1,42 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class MajorCompactMob < Command
def help
return <<-EOF
Run major compaction on a mob enabled column family
or all mob enabled column families within a table
Examples:
Compact a column family within a table:
hbase> major_compact_mob 't1', 'c1'
Compact all mob enabled column families within a table
hbase> major_compact_mob 't1'
EOF
end
def command(table_name, family = nil)
format_simple_command do
admin.major_compact_mob(table_name, family)
end
end
end
end
end