HBASE-13763 Handle the rename, annotation and typo stuff in MOB. (Jingcheng)
This commit is contained in:
parent
6388b3baf6
commit
b31a6acf4c
|
@ -1461,7 +1461,7 @@ public interface Admin extends Abortable, Closeable {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void compactMob(final TableName tableName) throws IOException,
|
||||
void compactMobs(final TableName tableName) throws IOException,
|
||||
InterruptedException;
|
||||
|
||||
/**
|
||||
|
@ -1482,7 +1482,7 @@ public interface Admin extends Abortable, Closeable {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void majorCompactMob(final TableName tableName) throws IOException,
|
||||
void majorCompactMobs(final TableName tableName) throws IOException,
|
||||
InterruptedException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -4052,7 +4052,7 @@ public class HBaseAdmin implements Admin {
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void compactMob(final TableName tableName) throws IOException, InterruptedException {
|
||||
public void compactMobs(final TableName tableName) throws IOException, InterruptedException {
|
||||
checkTableNameNotNull(tableName);
|
||||
compactMob(tableName, null, false);
|
||||
}
|
||||
|
@ -4073,7 +4073,7 @@ public class HBaseAdmin implements Admin {
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void majorCompactMob(final TableName tableName) throws IOException, InterruptedException {
|
||||
public void majorCompactMobs(final TableName tableName) throws IOException, InterruptedException {
|
||||
checkTableNameNotNull(tableName);
|
||||
compactMob(tableName, null, true);
|
||||
}
|
||||
|
|
|
@ -1605,54 +1605,54 @@ possible configurations would overwhelm and obscure the important.
|
|||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.mob.file.compaction.mergeable.threshold</name>
|
||||
<name>hbase.mob.compaction.mergeable.threshold</name>
|
||||
<value>201326592</value>
|
||||
<description>
|
||||
If the size of a mob file is less than this value, it's regarded as a small
|
||||
file and needs to be merged in mob file compaction. The default value is 192MB.
|
||||
file and needs to be merged in mob compaction. The default value is 192MB.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.mob.delfile.max.count</name>
|
||||
<value>3</value>
|
||||
<description>
|
||||
The max number of del files that is allowed in the mob file compaction.
|
||||
In the mob file compaction, when the number of existing del files is larger than
|
||||
The max number of del files that is allowed in the mob compaction.
|
||||
In the mob compaction, when the number of existing del files is larger than
|
||||
this value, they are merged until number of del files is not larger this value.
|
||||
The default value is 3.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.mob.file.compaction.batch.size</name>
|
||||
<name>hbase.mob.compaction.batch.size</name>
|
||||
<value>100</value>
|
||||
<description>
|
||||
The max number of the mob files that is allowed in a batch of the mob file compaction.
|
||||
The mob file compaction merges the small mob files to bigger ones. If the number of the
|
||||
The max number of the mob files that is allowed in a batch of the mob compaction.
|
||||
The mob compaction merges the small mob files to bigger ones. If the number of the
|
||||
small files is very large, it could lead to a "too many opened file handlers" in the merge.
|
||||
And the merge has to be split into batches. This value limits the number of mob files
|
||||
that are selected in a batch of the mob file compaction. The default value is 100.
|
||||
that are selected in a batch of the mob compaction. The default value is 100.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.mob.file.compaction.chore.period</name>
|
||||
<name>hbase.mob.compaction.chore.period</name>
|
||||
<value>604800</value>
|
||||
<description>
|
||||
The period that MobFileCompactionChore runs. The unit is second.
|
||||
The period that MobCompactionChore runs. The unit is second.
|
||||
The default value is one week.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.mob.file.compactor.class</name>
|
||||
<value>org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor</value>
|
||||
<name>hbase.mob.compactor.class</name>
|
||||
<value>org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor</value>
|
||||
<description>
|
||||
Implementation of mob file compactor, the default one is PartitionedMobFileCompactor.
|
||||
Implementation of mob compactor, the default one is PartitionedMobCompactor.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.mob.file.compaction.threads.max</name>
|
||||
<name>hbase.mob.compaction.threads.max</name>
|
||||
<value>1</value>
|
||||
<description>
|
||||
The max number of threads used in MobFileCompactor.
|
||||
The max number of threads used in MobCompactor.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -267,17 +267,17 @@ public interface MetricsRegionServerSource extends BaseSource {
|
|||
String MAJOR_COMPACTED_CELLS_SIZE = "majorCompactedCellsSize";
|
||||
String MAJOR_COMPACTED_CELLS_SIZE_DESC =
|
||||
"The total amount of data processed during major compactions, in bytes";
|
||||
String MOB_COMPACTED_INTO_MOB_CELLS_COUNT = "mobCompactedIntoMobCellsCount";
|
||||
String MOB_COMPACTED_INTO_MOB_CELLS_COUNT_DESC =
|
||||
String CELLS_COUNT_COMPACTED_TO_MOB = "cellsCountCompactedToMob";
|
||||
String CELLS_COUNT_COMPACTED_TO_MOB_DESC =
|
||||
"The number of cells moved to mob during compaction";
|
||||
String MOB_COMPACTED_FROM_MOB_CELLS_COUNT = "mobCompactedFromMobCellsCount";
|
||||
String MOB_COMPACTED_FROM_MOB_CELLS_COUNT_DESC =
|
||||
String CELLS_COUNT_COMPACTED_FROM_MOB = "cellsCountCompactedFromMob";
|
||||
String CELLS_COUNT_COMPACTED_FROM_MOB_DESC =
|
||||
"The number of cells moved from mob during compaction";
|
||||
String MOB_COMPACTED_INTO_MOB_CELLS_SIZE = "mobCompactedIntoMobCellsSize";
|
||||
String MOB_COMPACTED_INTO_MOB_CELLS_SIZE_DESC =
|
||||
String CELLS_SIZE_COMPACTED_TO_MOB = "cellsSizeCompactedToMob";
|
||||
String CELLS_SIZE_COMPACTED_TO_MOB_DESC =
|
||||
"The total amount of cells move to mob during compaction, in bytes";
|
||||
String MOB_COMPACTED_FROM_MOB_CELLS_SIZE = "mobCompactedFromMobCellsSize";
|
||||
String MOB_COMPACTED_FROM_MOB_CELLS_SIZE_DESC =
|
||||
String CELLS_SIZE_COMPACTED_FROM_MOB = "cellsSizeCompactedFromMob";
|
||||
String CELLS_SIZE_COMPACTED_FROM_MOB_DESC =
|
||||
"The total amount of cells move from mob during compaction, in bytes";
|
||||
String MOB_FLUSH_COUNT = "mobFlushCount";
|
||||
String MOB_FLUSH_COUNT_DESC = "The number of the flushes in mob-enabled stores";
|
||||
|
|
|
@ -258,24 +258,24 @@ public interface MetricsRegionServerWrapper {
|
|||
long getMajorCompactedCellsSize();
|
||||
|
||||
/**
|
||||
* Gets the number of cells move to mob during compaction.
|
||||
* Gets the number of cells moved to mob during compaction.
|
||||
*/
|
||||
long getMobCompactedIntoMobCellsCount();
|
||||
long getCellsCountCompactedToMob();
|
||||
|
||||
/**
|
||||
* Gets the number of cells move from mob during compaction.
|
||||
* Gets the number of cells moved from mob during compaction.
|
||||
*/
|
||||
long getMobCompactedFromMobCellsCount();
|
||||
long getCellsCountCompactedFromMob();
|
||||
|
||||
/**
|
||||
* Gets the total amount of cells move to mob during compaction, in bytes.
|
||||
* Gets the total amount of cells moved to mob during compaction, in bytes.
|
||||
*/
|
||||
long getMobCompactedIntoMobCellsSize();
|
||||
long getCellsSizeCompactedToMob();
|
||||
|
||||
/**
|
||||
* Gets the total amount of cells move from mob during compaction, in bytes.
|
||||
* Gets the total amount of cells moved from mob during compaction, in bytes.
|
||||
*/
|
||||
long getMobCompactedFromMobCellsSize();
|
||||
long getCellsSizeCompactedFromMob();
|
||||
|
||||
/**
|
||||
* Gets the number of the flushes in mob-enabled stores.
|
||||
|
|
|
@ -259,14 +259,14 @@ public class MetricsRegionServerSourceImpl
|
|||
.addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC),
|
||||
rsWrap.getMajorCompactedCellsSize())
|
||||
|
||||
.addCounter(Interns.info(MOB_COMPACTED_FROM_MOB_CELLS_COUNT, MOB_COMPACTED_FROM_MOB_CELLS_COUNT_DESC),
|
||||
rsWrap.getMobCompactedFromMobCellsCount())
|
||||
.addCounter(Interns.info(MOB_COMPACTED_INTO_MOB_CELLS_COUNT, MOB_COMPACTED_INTO_MOB_CELLS_COUNT_DESC),
|
||||
rsWrap.getMobCompactedIntoMobCellsCount())
|
||||
.addCounter(Interns.info(MOB_COMPACTED_FROM_MOB_CELLS_SIZE, MOB_COMPACTED_FROM_MOB_CELLS_SIZE_DESC),
|
||||
rsWrap.getMobCompactedFromMobCellsSize())
|
||||
.addCounter(Interns.info(MOB_COMPACTED_INTO_MOB_CELLS_SIZE, MOB_COMPACTED_INTO_MOB_CELLS_SIZE_DESC),
|
||||
rsWrap.getMobCompactedIntoMobCellsSize())
|
||||
.addCounter(Interns.info(CELLS_COUNT_COMPACTED_FROM_MOB, CELLS_COUNT_COMPACTED_FROM_MOB_DESC),
|
||||
rsWrap.getCellsCountCompactedFromMob())
|
||||
.addCounter(Interns.info(CELLS_COUNT_COMPACTED_TO_MOB, CELLS_COUNT_COMPACTED_TO_MOB_DESC),
|
||||
rsWrap.getCellsCountCompactedToMob())
|
||||
.addCounter(Interns.info(CELLS_SIZE_COMPACTED_FROM_MOB, CELLS_SIZE_COMPACTED_FROM_MOB_DESC),
|
||||
rsWrap.getCellsSizeCompactedFromMob())
|
||||
.addCounter(Interns.info(CELLS_SIZE_COMPACTED_TO_MOB, CELLS_SIZE_COMPACTED_TO_MOB_DESC),
|
||||
rsWrap.getCellsSizeCompactedToMob())
|
||||
.addCounter(Interns.info(MOB_FLUSH_COUNT, MOB_FLUSH_COUNT_DESC),
|
||||
rsWrap.getMobFlushCount())
|
||||
.addCounter(Interns.info(MOB_FLUSHED_CELLS_COUNT, MOB_FLUSHED_CELLS_COUNT_DESC),
|
||||
|
|
|
@ -51,9 +51,9 @@ public class CompactMobAction extends Action {
|
|||
LOG.info("Performing action: Compact mob of table " + tableName + ", major=" + major);
|
||||
try {
|
||||
if (major) {
|
||||
admin.majorCompactMob(tableName);
|
||||
admin.majorCompactMobs(tableName);
|
||||
} else {
|
||||
admin.compactMob(tableName);
|
||||
admin.compactMobs(tableName);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Mob Compaction failed, might be caused by other chaos: " + ex.getMessage());
|
||||
|
|
|
@ -184,7 +184,9 @@ public class HFileLink extends FileLink {
|
|||
/**
|
||||
* @return the path of the mob hfiles.
|
||||
*/
|
||||
public Path getMobPath() { return this.mobPath; }
|
||||
public Path getMobPath() {
|
||||
return this.mobPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param path Path to check.
|
||||
|
|
|
@ -92,7 +92,7 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore {
|
|||
lock.release();
|
||||
} catch (IOException e) {
|
||||
LOG.error(
|
||||
"Fail to release the write lock for the table " + htd.getNameAsString(), e);
|
||||
"Fail to release the read lock for the table " + htd.getNameAsString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -110,7 +110,6 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
|||
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
|
@ -280,13 +279,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
private LogCleaner logCleaner;
|
||||
private HFileCleaner hfileCleaner;
|
||||
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
|
||||
private MobFileCompactionChore mobFileCompactChore;
|
||||
MasterMobFileCompactionThread mobFileCompactThread;
|
||||
// used to synchronize the mobFileCompactionStates
|
||||
private final IdLock mobFileCompactionLock = new IdLock();
|
||||
// save the information of mob file compactions in tables.
|
||||
private MobCompactionChore mobCompactChore;
|
||||
private MasterMobCompactionThread mobCompactThread;
|
||||
// used to synchronize the mobCompactionStates
|
||||
private final IdLock mobCompactionLock = new IdLock();
|
||||
// save the information of mob compactions in tables.
|
||||
// the key is table name, the value is the number of compactions in that table.
|
||||
private Map<TableName, AtomicInteger> mobFileCompactionStates = Maps.newConcurrentMap();
|
||||
private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
|
||||
|
||||
MasterCoprocessorHost cpHost;
|
||||
|
||||
|
@ -796,9 +795,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
|
||||
getChoreService().scheduleChore(expiredMobFileCleanerChore);
|
||||
|
||||
this.mobFileCompactChore = new MobFileCompactionChore(this);
|
||||
getChoreService().scheduleChore(mobFileCompactChore);
|
||||
this.mobFileCompactThread = new MasterMobFileCompactionThread(this);
|
||||
this.mobCompactChore = new MobCompactionChore(this);
|
||||
getChoreService().scheduleChore(mobCompactChore);
|
||||
this.mobCompactThread = new MasterMobCompactionThread(this);
|
||||
|
||||
if (this.cpHost != null) {
|
||||
// don't let cp initialization errors kill the master
|
||||
|
@ -1134,8 +1133,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
if (this.expiredMobFileCleanerChore != null) {
|
||||
this.expiredMobFileCleanerChore.cancel(true);
|
||||
}
|
||||
if (this.mobFileCompactChore != null) {
|
||||
this.mobFileCompactChore.cancel(true);
|
||||
if (this.mobCompactChore != null) {
|
||||
this.mobCompactChore.cancel(true);
|
||||
}
|
||||
if (this.balancerChore != null) {
|
||||
this.balancerChore.cancel(true);
|
||||
|
@ -1149,8 +1148,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
if (this.clusterStatusPublisherChore != null){
|
||||
clusterStatusPublisherChore.cancel(true);
|
||||
}
|
||||
if (this.mobFileCompactThread != null) {
|
||||
this.mobFileCompactThread.close();
|
||||
if (this.mobCompactThread != null) {
|
||||
this.mobCompactThread.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2453,49 +2452,61 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
* @return If a given table is in mob file compaction now.
|
||||
*/
|
||||
public CompactionState getMobCompactionState(TableName tableName) {
|
||||
AtomicInteger compactionsCount = mobFileCompactionStates.get(tableName);
|
||||
AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
|
||||
if (compactionsCount != null && compactionsCount.get() != 0) {
|
||||
return CompactionState.MAJOR_AND_MINOR;
|
||||
}
|
||||
return CompactionState.NONE;
|
||||
}
|
||||
|
||||
public void reportMobFileCompactionStart(TableName tableName) throws IOException {
|
||||
public void reportMobCompactionStart(TableName tableName) throws IOException {
|
||||
IdLock.Entry lockEntry = null;
|
||||
try {
|
||||
lockEntry = mobFileCompactionLock.getLockEntry(tableName.hashCode());
|
||||
AtomicInteger compactionsCount = mobFileCompactionStates.get(tableName);
|
||||
lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
|
||||
AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
|
||||
if (compactionsCount == null) {
|
||||
compactionsCount = new AtomicInteger(0);
|
||||
mobFileCompactionStates.put(tableName, compactionsCount);
|
||||
mobCompactionStates.put(tableName, compactionsCount);
|
||||
}
|
||||
compactionsCount.incrementAndGet();
|
||||
} finally {
|
||||
if (lockEntry != null) {
|
||||
mobFileCompactionLock.releaseLockEntry(lockEntry);
|
||||
mobCompactionLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void reportMobFileCompactionEnd(TableName tableName) throws IOException {
|
||||
public void reportMobCompactionEnd(TableName tableName) throws IOException {
|
||||
IdLock.Entry lockEntry = null;
|
||||
try {
|
||||
lockEntry = mobFileCompactionLock.getLockEntry(tableName.hashCode());
|
||||
AtomicInteger compactionsCount = mobFileCompactionStates.get(tableName);
|
||||
lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
|
||||
AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
|
||||
if (compactionsCount != null) {
|
||||
int count = compactionsCount.decrementAndGet();
|
||||
// remove the entry if the count is 0.
|
||||
if (count == 0) {
|
||||
mobFileCompactionStates.remove(tableName);
|
||||
mobCompactionStates.remove(tableName);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (lockEntry != null) {
|
||||
mobFileCompactionLock.releaseLockEntry(lockEntry);
|
||||
mobCompactionLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests mob compaction.
|
||||
* @param tableName The table the compact.
|
||||
* @param columns The compacted columns.
|
||||
* @param allFiles Whether add all mob files into the compaction.
|
||||
*/
|
||||
public void requestMobCompaction(TableName tableName,
|
||||
List<HColumnDescriptor> columns, boolean allFiles) throws IOException {
|
||||
mobCompactThread.requestMobCompaction(conf, fs, tableName, columns,
|
||||
tableLockManager, allFiles);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
|
||||
* false is returned.
|
||||
|
|
|
@ -38,65 +38,65 @@ import org.apache.hadoop.hbase.mob.MobUtils;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* The mob file compaction thread used in {@link MasterRpcServices}
|
||||
* The mob compaction thread used in {@link MasterRpcServices}
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MasterMobFileCompactionThread {
|
||||
static final Log LOG = LogFactory.getLog(MasterMobFileCompactionThread.class);
|
||||
public class MasterMobCompactionThread {
|
||||
static final Log LOG = LogFactory.getLog(MasterMobCompactionThread.class);
|
||||
private final HMaster master;
|
||||
private final Configuration conf;
|
||||
private final ExecutorService mobFileCompactorPool;
|
||||
private final ExecutorService mobCompactorPool;
|
||||
private final ExecutorService masterMobPool;
|
||||
|
||||
public MasterMobFileCompactionThread(HMaster master) {
|
||||
public MasterMobCompactionThread(HMaster master) {
|
||||
this.master = master;
|
||||
this.conf = master.getConfiguration();
|
||||
final String n = Thread.currentThread().getName();
|
||||
// this pool is used to run the mob file compaction
|
||||
// this pool is used to run the mob compaction
|
||||
this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<Runnable>(), new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r);
|
||||
t.setName(n + "-MasterMobFileCompaction-" + EnvironmentEdgeManager.currentTime());
|
||||
t.setName(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime());
|
||||
return t;
|
||||
}
|
||||
});
|
||||
((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
|
||||
// this pool is used in the mob file compaction to compact the mob files by partitions
|
||||
// this pool is used in the mob compaction to compact the mob files by partitions
|
||||
// in parallel
|
||||
this.mobFileCompactorPool = MobUtils
|
||||
.createMobFileCompactorThreadPool(master.getConfiguration());
|
||||
this.mobCompactorPool = MobUtils
|
||||
.createMobCompactorThreadPool(master.getConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests mob file compaction
|
||||
* Requests mob compaction
|
||||
* @param conf The Configuration
|
||||
* @param fs The file system
|
||||
* @param tableName The table the compact
|
||||
* @param hcds The column descriptors
|
||||
* @param columns The column descriptors
|
||||
* @param tableLockManager The tableLock manager
|
||||
* @param isForceAllFiles Whether add all mob files into the compaction.
|
||||
* @param allFiles Whether add all mob files into the compaction.
|
||||
*/
|
||||
public void requestMobFileCompaction(Configuration conf, FileSystem fs, TableName tableName,
|
||||
List<HColumnDescriptor> hcds, TableLockManager tableLockManager, boolean isForceAllFiles)
|
||||
public void requestMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
|
||||
List<HColumnDescriptor> columns, TableLockManager tableLockManager, boolean allFiles)
|
||||
throws IOException {
|
||||
master.reportMobFileCompactionStart(tableName);
|
||||
master.reportMobCompactionStart(tableName);
|
||||
try {
|
||||
masterMobPool.execute(new CompactionRunner(fs, tableName, hcds, tableLockManager,
|
||||
isForceAllFiles, mobFileCompactorPool));
|
||||
masterMobPool.execute(new CompactionRunner(fs, tableName, columns, tableLockManager,
|
||||
allFiles, mobCompactorPool));
|
||||
} catch (RejectedExecutionException e) {
|
||||
// in case the request is rejected by the pool
|
||||
try {
|
||||
master.reportMobFileCompactionEnd(tableName);
|
||||
master.reportMobCompactionEnd(tableName);
|
||||
} catch (IOException e1) {
|
||||
LOG.error("Failed to mark end of mob file compation", e1);
|
||||
LOG.error("Failed to mark end of mob compation", e1);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The mob file compaction is requested for the columns " + hcds + " of the table "
|
||||
+ tableName.getNameAsString());
|
||||
LOG.debug("The mob compaction is requested for the columns " + columns
|
||||
+ " of the table " + tableName.getNameAsString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,17 +105,17 @@ public class MasterMobFileCompactionThread {
|
|||
private TableName tableName;
|
||||
private List<HColumnDescriptor> hcds;
|
||||
private TableLockManager tableLockManager;
|
||||
private boolean isForceAllFiles;
|
||||
private boolean allFiles;
|
||||
private ExecutorService pool;
|
||||
|
||||
public CompactionRunner(FileSystem fs, TableName tableName, List<HColumnDescriptor> hcds,
|
||||
TableLockManager tableLockManager, boolean isForceAllFiles, ExecutorService pool) {
|
||||
TableLockManager tableLockManager, boolean allFiles, ExecutorService pool) {
|
||||
super();
|
||||
this.fs = fs;
|
||||
this.tableName = tableName;
|
||||
this.hcds = hcds;
|
||||
this.tableLockManager = tableLockManager;
|
||||
this.isForceAllFiles = isForceAllFiles;
|
||||
this.allFiles = allFiles;
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
|
@ -123,16 +123,16 @@ public class MasterMobFileCompactionThread {
|
|||
public void run() {
|
||||
try {
|
||||
for (HColumnDescriptor hcd : hcds) {
|
||||
MobUtils.doMobFileCompaction(conf, fs, tableName, hcd, pool, tableLockManager,
|
||||
isForceAllFiles);
|
||||
MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, tableLockManager,
|
||||
allFiles);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to perform the mob file compaction", e);
|
||||
LOG.error("Failed to perform the mob compaction", e);
|
||||
} finally {
|
||||
try {
|
||||
master.reportMobFileCompactionEnd(tableName);
|
||||
master.reportMobCompactionEnd(tableName);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to mark end of mob file compation", e);
|
||||
LOG.error("Failed to mark end of mob compation", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -142,7 +142,7 @@ public class MasterMobFileCompactionThread {
|
|||
* Only interrupt once it's done with a run through the work loop.
|
||||
*/
|
||||
private void interruptIfNecessary() {
|
||||
mobFileCompactorPool.shutdown();
|
||||
mobCompactorPool.shutdown();
|
||||
masterMobPool.shutdown();
|
||||
}
|
||||
|
||||
|
@ -150,12 +150,12 @@ public class MasterMobFileCompactionThread {
|
|||
* Wait for all the threads finish.
|
||||
*/
|
||||
private void join() {
|
||||
waitFor(mobFileCompactorPool, "Mob file Compaction Thread");
|
||||
waitFor(masterMobPool, "Region Server Mob File Compaction Thread");
|
||||
waitFor(mobCompactorPool, "Mob Compaction Thread");
|
||||
waitFor(masterMobPool, "Region Server Mob Compaction Thread");
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the MasterMobFileCompactionThread.
|
||||
* Closes the MasterMobCompactionThread.
|
||||
*/
|
||||
public void close() {
|
||||
interruptIfNecessary();
|
|
@ -1428,7 +1428,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
if (!master.getTableStateManager().isTableState(tableName, TableState.State.ENABLED)) {
|
||||
throw new DoNotRetryIOException("Table " + tableName + " is not enabled");
|
||||
}
|
||||
boolean isForceAllFiles = false;
|
||||
boolean allFiles = false;
|
||||
List<HColumnDescriptor> compactedColumns = new ArrayList<HColumnDescriptor>();
|
||||
HColumnDescriptor[] hcds = master.getTableDescriptors().get(tableName).getColumnFamilies();
|
||||
byte[] family = null;
|
||||
|
@ -1437,8 +1437,8 @@ public class MasterRpcServices extends RSRpcServices
|
|||
for (HColumnDescriptor hcd : hcds) {
|
||||
if (Bytes.equals(family, hcd.getName())) {
|
||||
if (!hcd.isMobEnabled()) {
|
||||
LOG.error("Column family " + hcd.getName() + " is not a mob column family");
|
||||
throw new DoNotRetryIOException("Column family " + hcd.getName()
|
||||
LOG.error("Column family " + hcd.getNameAsString() + " is not a mob column family");
|
||||
throw new DoNotRetryIOException("Column family " + hcd.getNameAsString()
|
||||
+ " is not a mob column family");
|
||||
}
|
||||
compactedColumns.add(hcd);
|
||||
|
@ -1452,21 +1452,19 @@ public class MasterRpcServices extends RSRpcServices
|
|||
}
|
||||
}
|
||||
if (compactedColumns.isEmpty()) {
|
||||
LOG.error("No mob column families are assigned in the mob file compaction");
|
||||
LOG.error("No mob column families are assigned in the mob compaction");
|
||||
throw new DoNotRetryIOException(
|
||||
"No mob column families are assigned in the mob file compaction");
|
||||
"No mob column families are assigned in the mob compaction");
|
||||
}
|
||||
if (request.hasMajor() && request.getMajor()) {
|
||||
isForceAllFiles = true;
|
||||
allFiles = true;
|
||||
}
|
||||
String familyLogMsg = (family != null) ? Bytes.toString(family) : "";
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("User-triggered mob file compaction requested for table: "
|
||||
LOG.trace("User-triggered mob compaction requested for table: "
|
||||
+ tableName.getNameAsString() + " for column family: " + familyLogMsg);
|
||||
}
|
||||
master.mobFileCompactThread.requestMobFileCompaction(master.getConfiguration(),
|
||||
master.getFileSystem(), tableName, compactedColumns,
|
||||
master.getTableLockManager(), isForceAllFiles);
|
||||
master.requestMobCompaction(tableName, compactedColumns, allFiles);
|
||||
return CompactRegionResponse.newBuilder().build();
|
||||
}
|
||||
|
||||
|
|
|
@ -34,25 +34,25 @@ import org.apache.hadoop.hbase.mob.MobConstants;
|
|||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
|
||||
/**
|
||||
* The Class MobFileCompactChore for running compaction regularly to merge small mob files.
|
||||
* The Class MobCompactChore for running compaction regularly to merge small mob files.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MobFileCompactionChore extends ScheduledChore {
|
||||
public class MobCompactionChore extends ScheduledChore {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MobFileCompactionChore.class);
|
||||
private static final Log LOG = LogFactory.getLog(MobCompactionChore.class);
|
||||
private HMaster master;
|
||||
private TableLockManager tableLockManager;
|
||||
private ExecutorService pool;
|
||||
|
||||
public MobFileCompactionChore(HMaster master) {
|
||||
super(master.getServerName() + "-MobFileCompactChore", master, master.getConfiguration()
|
||||
.getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD,
|
||||
MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD), master.getConfiguration().getInt(
|
||||
MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD,
|
||||
MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD), TimeUnit.SECONDS);
|
||||
public MobCompactionChore(HMaster master) {
|
||||
super(master.getServerName() + "-MobCompactionChore", master, master.getConfiguration()
|
||||
.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
|
||||
MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD), master.getConfiguration().getInt(
|
||||
MobConstants.MOB_COMPACTION_CHORE_PERIOD,
|
||||
MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD), TimeUnit.SECONDS);
|
||||
this.master = master;
|
||||
this.tableLockManager = master.getTableLockManager();
|
||||
this.pool = MobUtils.createMobFileCompactorThreadPool(master.getConfiguration());
|
||||
this.pool = MobUtils.createMobCompactorThreadPool(master.getConfiguration());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -72,20 +72,20 @@ public class MobFileCompactionChore extends ScheduledChore {
|
|||
continue;
|
||||
}
|
||||
if (!reported) {
|
||||
master.reportMobFileCompactionStart(htd.getTableName());
|
||||
master.reportMobCompactionStart(htd.getTableName());
|
||||
reported = true;
|
||||
}
|
||||
MobUtils.doMobFileCompaction(master.getConfiguration(), master.getFileSystem(),
|
||||
MobUtils.doMobCompaction(master.getConfiguration(), master.getFileSystem(),
|
||||
htd.getTableName(), hcd, pool, tableLockManager, false);
|
||||
}
|
||||
} finally {
|
||||
if (reported) {
|
||||
master.reportMobFileCompactionEnd(htd.getTableName());
|
||||
master.reportMobCompactionEnd(htd.getTableName());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Fail to clean the expired mob files", e);
|
||||
LOG.error("Failed to compact mob files", e);
|
||||
}
|
||||
}
|
||||
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
|
|||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.master.RegionStates;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -73,8 +74,8 @@ public class DeleteTableHandler extends TableEventHandler {
|
|||
long waitTime = server.getConfiguration().
|
||||
getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
|
||||
for (HRegionInfo region : regions) {
|
||||
long done = System.currentTimeMillis() + waitTime;
|
||||
while (System.currentTimeMillis() < done) {
|
||||
long done = EnvironmentEdgeManager.currentTime() + waitTime;
|
||||
while (EnvironmentEdgeManager.currentTime() < done) {
|
||||
if (states.isRegionInState(region, State.FAILED_OPEN)) {
|
||||
am.regionOffline(region);
|
||||
}
|
||||
|
@ -192,14 +193,7 @@ public class DeleteTableHandler extends TableEventHandler {
|
|||
}
|
||||
|
||||
// Archive the mob data if there is a mob-enabled column
|
||||
HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies();
|
||||
boolean hasMob = false;
|
||||
for (HColumnDescriptor hcd : hcds) {
|
||||
if (hcd.isMobEnabled()) {
|
||||
hasMob = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
boolean hasMob = MobUtils.hasMobColumns(hTableDescriptor);
|
||||
Path mobTableDir = null;
|
||||
if (hasMob) {
|
||||
// Archive mob data
|
||||
|
|
|
@ -344,14 +344,7 @@ public class DeleteTableProcedure
|
|||
|
||||
// Archive the mob data if there is a mob-enabled column
|
||||
HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(tableName);
|
||||
HColumnDescriptor[] hcds = htd.getColumnFamilies();
|
||||
boolean hasMob = false;
|
||||
for (HColumnDescriptor hcd : hcds) {
|
||||
if (hcd.isMobEnabled()) {
|
||||
hasMob = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
boolean hasMob = MobUtils.hasMobColumns(htd);
|
||||
Path mobTableDir = null;
|
||||
if (hasMob) {
|
||||
// Archive mob data
|
||||
|
|
|
@ -44,12 +44,12 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
* Compact passed set of files in the mob-enabled column family.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DefaultMobCompactor extends DefaultCompactor {
|
||||
public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class);
|
||||
private static final Log LOG = LogFactory.getLog(DefaultMobStoreCompactor.class);
|
||||
private long mobSizeThreshold;
|
||||
private HMobStore mobStore;
|
||||
public DefaultMobCompactor(Configuration conf, Store store) {
|
||||
public DefaultMobStoreCompactor(Configuration conf, Store store) {
|
||||
super(conf, store);
|
||||
// The mob cells reside in the mob-enabled column family which is held by HMobStore.
|
||||
// During the compaction, the compactor reads the cells from the mob files and
|
||||
|
@ -83,6 +83,8 @@ public class DefaultMobCompactor extends DefaultCompactor {
|
|||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(store.getFamily().getMaxVersions());
|
||||
if (scanType == ScanType.COMPACT_DROP_DELETES) {
|
||||
// In major compaction, we need to write the delete markers to del files, so we have to
|
||||
// retain the them in scanning.
|
||||
scanType = ScanType.COMPACT_RETAIN_DELETES;
|
||||
return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
|
||||
scanType, smallestReadPoint, earliestPutTs, true);
|
||||
|
@ -133,6 +135,7 @@ public class DefaultMobCompactor extends DefaultCompactor {
|
|||
* @param writer Where to write to.
|
||||
* @param smallestReadPoint Smallest read point.
|
||||
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
|
||||
* @param throughputController The compaction throughput controller.
|
||||
* @param major Is a major compaction.
|
||||
* @return Whether compaction ended; false if it was interrupted for any reason.
|
||||
*/
|
||||
|
@ -160,10 +163,10 @@ public class DefaultMobCompactor extends DefaultCompactor {
|
|||
long deleteMarkersCount = 0;
|
||||
Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
|
||||
.getName());
|
||||
long mobCompactedIntoMobCellsCount = 0;
|
||||
long mobCompactedFromMobCellsCount = 0;
|
||||
long mobCompactedIntoMobCellsSize = 0;
|
||||
long mobCompactedFromMobCellsSize = 0;
|
||||
long cellsCountCompactedToMob = 0;
|
||||
long cellsCountCompactedFromMob = 0;
|
||||
long cellsSizeCompactedToMob = 0;
|
||||
long cellsSizeCompactedFromMob = 0;
|
||||
try {
|
||||
try {
|
||||
// If the mob file writer could not be created, directly write the cell to the store file.
|
||||
|
@ -172,7 +175,7 @@ public class DefaultMobCompactor extends DefaultCompactor {
|
|||
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||
} catch (IOException e) {
|
||||
LOG.error(
|
||||
"Fail to create mob writer, "
|
||||
"Failed to create mob writer, "
|
||||
+ "we will continue the compaction by writing MOB cells directly in store files",
|
||||
e);
|
||||
}
|
||||
|
@ -180,8 +183,6 @@ public class DefaultMobCompactor extends DefaultCompactor {
|
|||
store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
|
||||
ScannerContext scannerContext =
|
||||
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
|
||||
|
||||
|
||||
do {
|
||||
hasMore = compactionScanner.next(cells, scannerContext);
|
||||
// output to writer:
|
||||
|
@ -211,8 +212,8 @@ public class DefaultMobCompactor extends DefaultCompactor {
|
|||
// put the mob data back to the store file
|
||||
CellUtil.setSequenceId(mobCell, c.getSequenceId());
|
||||
writer.append(mobCell);
|
||||
mobCompactedFromMobCellsCount++;
|
||||
mobCompactedFromMobCellsSize += mobCell.getValueLength();
|
||||
cellsCountCompactedFromMob++;
|
||||
cellsSizeCompactedFromMob += mobCell.getValueLength();
|
||||
} else {
|
||||
// If the value of a file is empty, there might be issues when retrieving,
|
||||
// directly write the cell to the store file, and leave it to be handled by the
|
||||
|
@ -238,8 +239,8 @@ public class DefaultMobCompactor extends DefaultCompactor {
|
|||
KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag);
|
||||
// write the cell whose value is the path of a mob file to the store file.
|
||||
writer.append(reference);
|
||||
mobCompactedIntoMobCellsCount++;
|
||||
mobCompactedIntoMobCellsSize += c.getValueLength();
|
||||
cellsCountCompactedToMob++;
|
||||
cellsSizeCompactedToMob += c.getValueLength();
|
||||
}
|
||||
++progress.currentCompactedKVs;
|
||||
|
||||
|
@ -276,7 +277,7 @@ public class DefaultMobCompactor extends DefaultCompactor {
|
|||
// If the mob file is empty, delete it instead of committing.
|
||||
store.getFileSystem().delete(mobFileWriter.getPath(), true);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to delete the temp mob file", e);
|
||||
LOG.error("Failed to delete the temp mob file", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -290,14 +291,14 @@ public class DefaultMobCompactor extends DefaultCompactor {
|
|||
// If the del file is empty, delete it instead of committing.
|
||||
store.getFileSystem().delete(delFileWriter.getPath(), true);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to delete the temp del file", e);
|
||||
LOG.error("Failed to delete the temp del file", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
mobStore.updateMobCompactedFromMobCellsCount(mobCompactedFromMobCellsCount);
|
||||
mobStore.updateMobCompactedIntoMobCellsCount(mobCompactedIntoMobCellsCount);
|
||||
mobStore.updateMobCompactedFromMobCellsSize(mobCompactedFromMobCellsSize);
|
||||
mobStore.updateMobCompactedIntoMobCellsSize(mobCompactedIntoMobCellsSize);
|
||||
mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
|
||||
mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
|
||||
mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
|
||||
mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
|
||||
progress.complete();
|
||||
return true;
|
||||
}
|
|
@ -121,7 +121,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
|||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
LOG.info("Flushed, sequenceid=" + cacheFlushId + ", memsize="
|
||||
LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
|
||||
+ StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
|
||||
", hasBloomFilter=" + writer.hasGeneralBloom() +
|
||||
", into tmp file " + writer.getPath());
|
||||
|
@ -213,7 +213,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
|||
// If the mob file is empty, delete it instead of committing.
|
||||
store.getFileSystem().delete(mobFileWriter.getPath(), true);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to delete the temp mob file", e);
|
||||
LOG.error("Failed to delete the temp mob file", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -112,7 +112,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
|
|||
try {
|
||||
admin.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to close the HBaseAdmin.", e);
|
||||
LOG.error("Failed to close the HBaseAdmin.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,43 +77,43 @@ public class MobConstants {
|
|||
public final static String EMPTY_STRING = "";
|
||||
/**
|
||||
* If the size of a mob file is less than this value, it's regarded as a small file and needs to
|
||||
* be merged in mob file compaction. The default value is 192MB.
|
||||
* be merged in mob compaction. The default value is 192MB.
|
||||
*/
|
||||
public static final String MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD =
|
||||
"hbase.mob.file.compaction.mergeable.threshold";
|
||||
public static final long DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD = 192 * 1024 * 1024;
|
||||
public static final String MOB_COMPACTION_MERGEABLE_THRESHOLD =
|
||||
"hbase.mob.compaction.mergeable.threshold";
|
||||
public static final long DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD = 192 * 1024 * 1024;
|
||||
/**
|
||||
* The max number of del files that is allowed in the mob file compaction. In the mob file
|
||||
* The max number of del files that is allowed in the mob file compaction. In the mob
|
||||
* compaction, when the number of existing del files is larger than this value, they are merged
|
||||
* until number of del files is not larger this value. The default value is 3.
|
||||
*/
|
||||
public static final String MOB_DELFILE_MAX_COUNT = "hbase.mob.delfile.max.count";
|
||||
public static final int DEFAULT_MOB_DELFILE_MAX_COUNT = 3;
|
||||
/**
|
||||
* The max number of the mob files that is allowed in a batch of the mob file compaction.
|
||||
* The mob file compaction merges the small mob files to bigger ones. If the number of the
|
||||
* The max number of the mob files that is allowed in a batch of the mob compaction.
|
||||
* The mob compaction merges the small mob files to bigger ones. If the number of the
|
||||
* small files is very large, it could lead to a "too many opened file handlers" in the merge.
|
||||
* And the merge has to be split into batches. This value limits the number of mob files
|
||||
* that are selected in a batch of the mob file compaction. The default value is 100.
|
||||
* that are selected in a batch of the mob compaction. The default value is 100.
|
||||
*/
|
||||
public static final String MOB_FILE_COMPACTION_BATCH_SIZE =
|
||||
"hbase.mob.file.compaction.batch.size";
|
||||
public static final int DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE = 100;
|
||||
public static final String MOB_COMPACTION_BATCH_SIZE =
|
||||
"hbase.mob.compaction.batch.size";
|
||||
public static final int DEFAULT_MOB_COMPACTION_BATCH_SIZE = 100;
|
||||
/**
|
||||
* The period that MobFileCompactionChore runs. The unit is millisecond.
|
||||
* The period that MobCompactionChore runs. The unit is second.
|
||||
* The default value is one week.
|
||||
*/
|
||||
public static final String MOB_FILE_COMPACTION_CHORE_PERIOD =
|
||||
"hbase.mob.file.compaction.chore.period";
|
||||
public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD =
|
||||
public static final String MOB_COMPACTION_CHORE_PERIOD =
|
||||
"hbase.mob.compaction.chore.period";
|
||||
public static final int DEFAULT_MOB_COMPACTION_CHORE_PERIOD =
|
||||
24 * 60 * 60 * 7; // a week
|
||||
public static final String MOB_FILE_COMPACTOR_CLASS_KEY = "hbase.mob.file.compactor.class";
|
||||
public static final String MOB_COMPACTOR_CLASS_KEY = "hbase.mob.compactor.class";
|
||||
/**
|
||||
* The max number of threads used in MobFileCompactor.
|
||||
* The max number of threads used in MobCompactor.
|
||||
*/
|
||||
public static final String MOB_FILE_COMPACTION_THREADS_MAX =
|
||||
"hbase.mob.file.compaction.threads.max";
|
||||
public static final int DEFAULT_MOB_FILE_COMPACTION_THREADS_MAX = 1;
|
||||
public static final String MOB_COMPACTION_THREADS_MAX =
|
||||
"hbase.mob.compaction.threads.max";
|
||||
public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1;
|
||||
private MobConstants() {
|
||||
|
||||
}
|
||||
|
|
|
@ -177,7 +177,7 @@ public class MobFileCache {
|
|||
evictedFileCount.incrementAndGet();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to evict the file " + fileName, e);
|
||||
LOG.error("Failed to evict the file " + fileName, e);
|
||||
} finally {
|
||||
if (lockEntry != null) {
|
||||
keyLock.releaseLockEntry(lockEntry);
|
||||
|
@ -249,7 +249,7 @@ public class MobFileCache {
|
|||
|
||||
public void shutdown() {
|
||||
this.scheduleThreadPool.shutdown();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
if (!this.scheduleThreadPool.isShutdown()) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
|
|
|
@ -26,9 +26,9 @@ import org.apache.hadoop.hbase.util.MD5Hash;
|
|||
* It consists of a md5 of a start key, a date and an uuid.
|
||||
* It looks like md5(start) + date + uuid.
|
||||
* <ol>
|
||||
* <li>0-31 characters: md5 hex string of a start key. Since the length of the start key is not
|
||||
* <li>characters 0-31: md5 hex string of a start key. Since the length of the start key is not
|
||||
* fixed, have to use the md5 instead which has a fix length.</li>
|
||||
* <li>32-39 characters: a string of a date with format yyyymmdd. The date is the latest timestamp
|
||||
* <li>characters 32-39: a string of a date with format yyyymmdd. The date is the latest timestamp
|
||||
* of cells in this file</li>
|
||||
* <li>the remaining characters: the uuid.</li>
|
||||
* </ol>
|
||||
|
|
|
@ -43,6 +43,6 @@ public class MobStoreEngine extends DefaultStoreEngine {
|
|||
*/
|
||||
@Override
|
||||
protected void createCompactor(Configuration conf, Store store) throws IOException {
|
||||
compactor = new DefaultMobCompactor(conf, store);
|
||||
compactor = new DefaultMobStoreCompactor(conf, store);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.CellComparator;
|
|||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
|
@ -63,8 +64,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
||||
import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor;
|
||||
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor;
|
||||
import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
|
@ -249,7 +250,7 @@ public class MobUtils {
|
|||
try {
|
||||
stats = fs.listStatus(path);
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.warn("Fail to find the mob file " + path, e);
|
||||
LOG.warn("Failed to find the mob file " + path, e);
|
||||
}
|
||||
if (null == stats) {
|
||||
// no file found
|
||||
|
@ -287,7 +288,7 @@ public class MobUtils {
|
|||
filesToClean);
|
||||
deletedFileCount = filesToClean.size();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to delete the mob files " + filesToClean, e);
|
||||
LOG.error("Failed to delete the mob files " + filesToClean, e);
|
||||
}
|
||||
}
|
||||
LOG.info(deletedFileCount + " expired mob files are deleted");
|
||||
|
@ -555,7 +556,7 @@ public class MobUtils {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a writer for the del file in temp directory.
|
||||
* Creates a writer for the mob file in temp directory.
|
||||
* @param conf The current configuration.
|
||||
* @param fs The current file system.
|
||||
* @param family The descriptor of the current column family.
|
||||
|
@ -631,7 +632,7 @@ public class MobUtils {
|
|||
storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
|
||||
storeFile.createReader();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
|
||||
LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
|
||||
throw e;
|
||||
} finally {
|
||||
if (storeFile != null) {
|
||||
|
@ -692,22 +693,22 @@ public class MobUtils {
|
|||
}
|
||||
|
||||
/**
|
||||
* Performs the mob file compaction.
|
||||
* Performs the mob compaction.
|
||||
* @param conf the Configuration
|
||||
* @param fs the file system
|
||||
* @param tableName the table the compact
|
||||
* @param hcd the column descriptor
|
||||
* @param pool the thread pool
|
||||
* @param tableLockManager the tableLock manager
|
||||
* @param isForceAllFiles Whether add all mob files into the compaction.
|
||||
* @param allFiles Whether add all mob files into the compaction.
|
||||
*/
|
||||
public static void doMobFileCompaction(Configuration conf, FileSystem fs, TableName tableName,
|
||||
public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
|
||||
HColumnDescriptor hcd, ExecutorService pool, TableLockManager tableLockManager,
|
||||
boolean isForceAllFiles) throws IOException {
|
||||
String className = conf.get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY,
|
||||
PartitionedMobFileCompactor.class.getName());
|
||||
// instantiate the mob file compactor.
|
||||
MobFileCompactor compactor = null;
|
||||
boolean allFiles) throws IOException {
|
||||
String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY,
|
||||
PartitionedMobCompactor.class.getName());
|
||||
// instantiate the mob compactor.
|
||||
MobCompactor compactor = null;
|
||||
try {
|
||||
compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
|
||||
Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class,
|
||||
|
@ -724,21 +725,21 @@ public class MobUtils {
|
|||
// the tableLockManager might be null in testing. In that case, it is lock-free.
|
||||
if (tableLockManager != null) {
|
||||
lock = tableLockManager.writeLock(MobUtils.getTableLockName(tableName),
|
||||
"Run MobFileCompaction");
|
||||
"Run MobCompactor");
|
||||
lock.acquire();
|
||||
}
|
||||
tableLocked = true;
|
||||
compactor.compact(isForceAllFiles);
|
||||
compactor.compact(allFiles);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString()
|
||||
LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString()
|
||||
+ " in the table " + tableName.getNameAsString(), e);
|
||||
} finally {
|
||||
if (lock != null && tableLocked) {
|
||||
try {
|
||||
lock.release();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to release the write lock for the table " + tableName.getNameAsString(),
|
||||
e);
|
||||
LOG.error(
|
||||
"Failed to release the write lock for the table " + tableName.getNameAsString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -749,15 +750,15 @@ public class MobUtils {
|
|||
* @param conf the Configuration
|
||||
* @return A thread pool.
|
||||
*/
|
||||
public static ExecutorService createMobFileCompactorThreadPool(Configuration conf) {
|
||||
int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_THREADS_MAX,
|
||||
MobConstants.DEFAULT_MOB_FILE_COMPACTION_THREADS_MAX);
|
||||
public static ExecutorService createMobCompactorThreadPool(Configuration conf) {
|
||||
int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX,
|
||||
MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX);
|
||||
if (maxThreads == 0) {
|
||||
maxThreads = 1;
|
||||
}
|
||||
final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
|
||||
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue,
|
||||
Threads.newDaemonThreadFactory("MobFileCompactor"), new RejectedExecutionHandler() {
|
||||
Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
try {
|
||||
|
@ -839,4 +840,19 @@ public class MobUtils {
|
|||
}
|
||||
return cryptoContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether this table has mob-enabled columns.
|
||||
* @param htd The current table descriptor.
|
||||
* @return Whether this table has mob-enabled columns.
|
||||
*/
|
||||
public static boolean hasMobColumns(HTableDescriptor htd) {
|
||||
HColumnDescriptor[] hcds = htd.getColumnFamilies();
|
||||
for (HColumnDescriptor hcd : hcds) {
|
||||
if (hcd.isMobEnabled()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob.filecompactions;
|
||||
package org.apache.hadoop.hbase.mob.compactions;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
|
@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
* The compaction request for mob files.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class MobFileCompactionRequest {
|
||||
public abstract class MobCompactionRequest {
|
||||
|
||||
protected long selectionTime;
|
||||
protected CompactionType type = CompactionType.PART_FILES;
|
|
@ -16,7 +16,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob.filecompactions;
|
||||
package org.apache.hadoop.hbase.mob.compactions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -34,10 +34,10 @@ import org.apache.hadoop.hbase.mob.MobUtils;
|
|||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
/**
|
||||
* A mob file compactor to directly compact the mob files.
|
||||
* A mob compactor to directly compact the mob files.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class MobFileCompactor {
|
||||
public abstract class MobCompactor {
|
||||
|
||||
protected FileSystem fs;
|
||||
protected Configuration conf;
|
||||
|
@ -48,7 +48,7 @@ public abstract class MobFileCompactor {
|
|||
protected Path mobFamilyDir;
|
||||
protected ExecutorService pool;
|
||||
|
||||
public MobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
|
||||
public MobCompactor(Configuration conf, FileSystem fs, TableName tableName,
|
||||
HColumnDescriptor column, ExecutorService pool) {
|
||||
this.conf = conf;
|
||||
this.fs = fs;
|
||||
|
@ -70,21 +70,21 @@ public abstract class MobFileCompactor {
|
|||
|
||||
/**
|
||||
* Compacts the mob files by compaction type for the current column family.
|
||||
* @param isForceAllFiles Whether add all mob files into the compaction.
|
||||
* @param allFiles Whether add all mob files into the compaction.
|
||||
* @return The paths of new mob files generated in the compaction.
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<Path> compact(boolean isForceAllFiles) throws IOException {
|
||||
return compact(Arrays.asList(fs.listStatus(mobFamilyDir)), isForceAllFiles);
|
||||
public List<Path> compact(boolean allFiles) throws IOException {
|
||||
return compact(Arrays.asList(fs.listStatus(mobFamilyDir)), allFiles);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compacts the candidate mob files.
|
||||
* @param files The candidate mob files.
|
||||
* @param isForceAllFiles Whether add all mob files into the compaction.
|
||||
* @param allFiles Whether add all mob files into the compaction.
|
||||
* @return The paths of new mob files generated in the compaction.
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
|
||||
public abstract List<Path> compact(List<FileStatus> files, boolean allFiles)
|
||||
throws IOException;
|
||||
}
|
|
@ -16,7 +16,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob.filecompactions;
|
||||
package org.apache.hadoop.hbase.mob.compactions;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -28,18 +28,18 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* An implementation of {@link MobFileCompactionRequest} that is used in
|
||||
* {@link PartitionedMobFileCompactor}.
|
||||
* An implementation of {@link MobCompactionRequest} that is used in
|
||||
* {@link PartitionedMobCompactor}.
|
||||
* The mob files that have the same start key and date in their names belong to
|
||||
* the same partition.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class PartitionedMobFileCompactionRequest extends MobFileCompactionRequest {
|
||||
public class PartitionedMobCompactionRequest extends MobCompactionRequest {
|
||||
|
||||
protected Collection<FileStatus> delFiles;
|
||||
protected Collection<CompactionPartition> compactionPartitions;
|
||||
|
||||
public PartitionedMobFileCompactionRequest(Collection<CompactionPartition> compactionPartitions,
|
||||
public PartitionedMobCompactionRequest(Collection<CompactionPartition> compactionPartitions,
|
||||
Collection<FileStatus> delFiles) {
|
||||
this.selectionTime = EnvironmentEdgeManager.currentTime();
|
||||
this.compactionPartitions = compactionPartitions;
|
||||
|
@ -63,7 +63,7 @@ public class PartitionedMobFileCompactionRequest extends MobFileCompactionReques
|
|||
}
|
||||
|
||||
/**
|
||||
* The partition in the mob file compaction.
|
||||
* The partition in the mob compaction.
|
||||
* The mob files that have the same start key and date in their names belong to
|
||||
* the same partition.
|
||||
*/
|
||||
|
@ -91,7 +91,7 @@ public class PartitionedMobFileCompactionRequest extends MobFileCompactionReques
|
|||
/**
|
||||
* The partition id that consists of start key and date of the mob file name.
|
||||
*/
|
||||
protected static class CompactionPartitionId {
|
||||
public static class CompactionPartitionId {
|
||||
|
||||
private String startKey;
|
||||
private String date;
|
|
@ -16,7 +16,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob.filecompactions;
|
||||
package org.apache.hadoop.hbase.mob.compactions;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -39,8 +39,19 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -48,21 +59,29 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
|
|||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobFileName;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
|
||||
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
|
||||
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
|
||||
import org.apache.hadoop.hbase.regionserver.*;
|
||||
import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreScanner;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* An implementation of {@link MobFileCompactor} that compacts the mob files in partitions.
|
||||
* An implementation of {@link MobCompactor} that compacts the mob files in partitions.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class PartitionedMobFileCompactor extends MobFileCompactor {
|
||||
public class PartitionedMobCompactor extends MobCompactor {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(PartitionedMobFileCompactor.class);
|
||||
private static final Log LOG = LogFactory.getLog(PartitionedMobCompactor.class);
|
||||
protected long mergeableSize;
|
||||
protected int delFileMaxCount;
|
||||
/** The number of files compacted in a batch */
|
||||
|
@ -75,16 +94,16 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
private Tag tableNameTag;
|
||||
private Encryption.Context cryptoContext = Encryption.Context.NONE;
|
||||
|
||||
public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
|
||||
public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
|
||||
HColumnDescriptor column, ExecutorService pool) throws IOException {
|
||||
super(conf, fs, tableName, column, pool);
|
||||
mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
|
||||
MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
|
||||
mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
|
||||
MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
|
||||
delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
|
||||
MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
|
||||
// default is 100
|
||||
compactionBatchSize = conf.getInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
|
||||
MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
|
||||
compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
|
||||
MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
|
||||
tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
|
||||
bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
|
||||
tableName.getNamespaceAsString(), tableName.getQualifierAsString())));
|
||||
|
@ -98,14 +117,14 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) throws IOException {
|
||||
public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException {
|
||||
if (files == null || files.isEmpty()) {
|
||||
LOG.info("No candidate mob files");
|
||||
return null;
|
||||
}
|
||||
LOG.info("isForceAllFiles: " + isForceAllFiles);
|
||||
LOG.info("is allFiles: " + allFiles);
|
||||
// find the files to compact.
|
||||
PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles);
|
||||
PartitionedMobCompactionRequest request = select(files, allFiles);
|
||||
// compact the files.
|
||||
return performCompaction(request);
|
||||
}
|
||||
|
@ -114,12 +133,12 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
* Selects the compacted mob/del files.
|
||||
* Iterates the candidates to find out all the del files and small mob files.
|
||||
* @param candidates All the candidates.
|
||||
* @param isForceAllFiles Whether add all mob files into the compaction.
|
||||
* @param allFiles Whether add all mob files into the compaction.
|
||||
* @return A compaction request.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates,
|
||||
boolean isForceAllFiles) throws IOException {
|
||||
protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
|
||||
boolean allFiles) throws IOException {
|
||||
Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
|
||||
Map<CompactionPartitionId, CompactionPartition> filesToCompact =
|
||||
new HashMap<CompactionPartitionId, CompactionPartition>();
|
||||
|
@ -143,8 +162,8 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
}
|
||||
if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
|
||||
allDelFiles.add(file);
|
||||
} else if (isForceAllFiles || linkedFile.getLen() < mergeableSize) {
|
||||
// add all files if isForceAllFiles is true,
|
||||
} else if (allFiles || linkedFile.getLen() < mergeableSize) {
|
||||
// add all files if allFiles is true,
|
||||
// otherwise add the small files to the merge pool
|
||||
MobFileName fileName = MobFileName.create(linkedFile.getPath().getName());
|
||||
CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(),
|
||||
|
@ -160,7 +179,7 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
selectedFileCount++;
|
||||
}
|
||||
}
|
||||
PartitionedMobFileCompactionRequest request = new PartitionedMobFileCompactionRequest(
|
||||
PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest(
|
||||
filesToCompact.values(), allDelFiles);
|
||||
if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
|
||||
// all the files are selected
|
||||
|
@ -183,7 +202,7 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
* @return The paths of new mob files generated in the compaction.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
|
||||
protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
|
||||
throws IOException {
|
||||
// merge the del files
|
||||
List<Path> delFilePaths = new ArrayList<Path>();
|
||||
|
@ -202,12 +221,12 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
LOG.info("After compaction, there are " + paths.size() + " mob files");
|
||||
// archive the del files if all the mob files are selected.
|
||||
if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
|
||||
LOG.info("After a mob file compaction with all files selected, archiving the del files "
|
||||
+ newDelFiles);
|
||||
LOG.info("After a mob compaction with all files selected, archiving the del files "
|
||||
+ newDelPaths);
|
||||
try {
|
||||
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to archive the del files " + newDelFiles, e);
|
||||
LOG.error("Failed to archive the del files " + newDelPaths, e);
|
||||
}
|
||||
}
|
||||
return paths;
|
||||
|
@ -220,7 +239,7 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
* @return The paths of new mob files after compactions.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected List<Path> compactMobFiles(final PartitionedMobFileCompactionRequest request,
|
||||
protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request,
|
||||
final List<StoreFile> delFiles) throws IOException {
|
||||
Collection<CompactionPartition> partitions = request.compactionPartitions;
|
||||
if (partitions == null || partitions.isEmpty()) {
|
||||
|
@ -244,19 +263,19 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
}));
|
||||
}
|
||||
// compact the partitions in parallel.
|
||||
boolean hasFailure = false;
|
||||
List<CompactionPartitionId> failedPartitions = new ArrayList<CompactionPartitionId>();
|
||||
for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
|
||||
try {
|
||||
paths.addAll(result.getValue().get());
|
||||
} catch (Exception e) {
|
||||
// just log the error
|
||||
LOG.error("Failed to compact the partition " + result.getKey(), e);
|
||||
hasFailure = true;
|
||||
failedPartitions.add(result.getKey());
|
||||
}
|
||||
}
|
||||
if (hasFailure) {
|
||||
if (!failedPartitions.isEmpty()) {
|
||||
// if any partition fails in the compaction, directly throw an exception.
|
||||
throw new IOException("Failed to compact the partitions");
|
||||
throw new IOException("Failed to compact the partitions " + failedPartitions);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
|
@ -277,7 +296,7 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
* @return The paths of new mob files after compactions.
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<Path> compactMobFilePartition(PartitionedMobFileCompactionRequest request,
|
||||
private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request,
|
||||
CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException {
|
||||
List<Path> newFiles = new ArrayList<Path>();
|
||||
List<FileStatus> files = partition.listFiles();
|
||||
|
@ -331,7 +350,7 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
* @param newFiles The paths of new mob files after compactions.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void compactMobFilesInBatch(PartitionedMobFileCompactionRequest request,
|
||||
private void compactMobFilesInBatch(PartitionedMobCompactionRequest request,
|
||||
CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch,
|
||||
Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
|
||||
throws IOException {
|
||||
|
@ -411,7 +430,7 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
* is necessary.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected List<Path> compactDelFiles(PartitionedMobFileCompactionRequest request,
|
||||
protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request,
|
||||
List<Path> delFilePaths) throws IOException {
|
||||
if (delFilePaths.size() <= delFileMaxCount) {
|
||||
return delFilePaths;
|
||||
|
@ -451,7 +470,7 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
* @return The path of new del file after merging.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Path compactDelFilesInBatch(PartitionedMobFileCompactionRequest request,
|
||||
private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request,
|
||||
List<StoreFile> delFiles) throws IOException {
|
||||
// create a scanner for the del files.
|
||||
StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
|
|
@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.io.crypto.Encryption;
|
|||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepReducer.SweepPartitionId;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStore;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
|
||||
|
@ -68,7 +68,7 @@ public class MemStoreWrapper {
|
|||
|
||||
private MemStore memstore;
|
||||
private long flushSize;
|
||||
private SweepPartitionId partitionId;
|
||||
private CompactionPartitionId partitionId;
|
||||
private Context context;
|
||||
private Configuration conf;
|
||||
private BufferedMutator table;
|
||||
|
@ -78,8 +78,8 @@ public class MemStoreWrapper {
|
|||
private CacheConfig cacheConfig;
|
||||
private Encryption.Context cryptoContext = Encryption.Context.NONE;
|
||||
|
||||
public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table, HColumnDescriptor hcd,
|
||||
MemStore memstore, CacheConfig cacheConfig) throws IOException {
|
||||
public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table,
|
||||
HColumnDescriptor hcd, MemStore memstore, CacheConfig cacheConfig) throws IOException {
|
||||
this.memstore = memstore;
|
||||
this.context = context;
|
||||
this.fs = fs;
|
||||
|
@ -93,7 +93,7 @@ public class MemStoreWrapper {
|
|||
cryptoContext = MobUtils.createEncryptionContext(conf, hcd);
|
||||
}
|
||||
|
||||
public void setPartitionId(SweepPartitionId partitionId) {
|
||||
public void setPartitionId(CompactionPartitionId partitionId) {
|
||||
this.partitionId = partitionId;
|
||||
}
|
||||
|
||||
|
@ -155,16 +155,19 @@ public class MemStoreWrapper {
|
|||
scanner = snapshot.getScanner();
|
||||
scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
|
||||
cell = null;
|
||||
Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName().toString()));
|
||||
Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName()
|
||||
.toString()));
|
||||
long updatedCount = 0;
|
||||
while (null != (cell = scanner.next())) {
|
||||
KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag);
|
||||
Put put =
|
||||
new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
|
||||
put.add(reference);
|
||||
table.mutate(put);
|
||||
context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1);
|
||||
updatedCount++;
|
||||
}
|
||||
table.flush();
|
||||
context.getCounter(SweepCounter.RECORDS_UPDATED).increment(updatedCount);
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -85,12 +85,12 @@ public class SweepJob {
|
|||
private final FileSystem fs;
|
||||
private final Configuration conf;
|
||||
private static final Log LOG = LogFactory.getLog(SweepJob.class);
|
||||
static final String SWEEP_JOB_ID = "mob.sweep.job.id";
|
||||
static final String SWEEP_JOB_SERVERNAME = "mob.sweep.job.servername";
|
||||
static final String SWEEP_JOB_TABLE_NODE = "mob.sweep.job.table.node";
|
||||
static final String WORKING_DIR_KEY = "mob.sweep.job.dir";
|
||||
static final String WORKING_ALLNAMES_FILE_KEY = "mob.sweep.job.all.file";
|
||||
static final String WORKING_VISITED_DIR_KEY = "mob.sweep.job.visited.dir";
|
||||
static final String SWEEP_JOB_ID = "hbase.mob.sweep.job.id";
|
||||
static final String SWEEP_JOB_SERVERNAME = "hbase.mob.sweep.job.servername";
|
||||
static final String SWEEP_JOB_TABLE_NODE = "hbase.mob.sweep.job.table.node";
|
||||
static final String WORKING_DIR_KEY = "hbase.mob.sweep.job.dir";
|
||||
static final String WORKING_ALLNAMES_FILE_KEY = "hbase.mob.sweep.job.all.file";
|
||||
static final String WORKING_VISITED_DIR_KEY = "hbase.mob.sweep.job.visited.dir";
|
||||
static final String WORKING_ALLNAMES_DIR = "all";
|
||||
static final String WORKING_VISITED_DIR = "visited";
|
||||
public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir";
|
||||
|
@ -228,7 +228,7 @@ public class SweepJob {
|
|||
try {
|
||||
lock.release();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to release the table lock " + tableName, e);
|
||||
LOG.error("Failed to release the table lock " + tableName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -435,7 +435,7 @@ public class SweepJob {
|
|||
FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles);
|
||||
LOG.info(storeFiles.size() + " unused MOB files are removed");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Fail to archive the store files " + storeFiles, e);
|
||||
LOG.error("Failed to archive the store files " + storeFiles, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -452,7 +452,7 @@ public class SweepJob {
|
|||
try {
|
||||
fs.delete(workingPath, true);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Fail to delete the working directory after sweeping store " + familyName
|
||||
LOG.warn("Failed to delete the working directory after sweeping store " + familyName
|
||||
+ " in the table " + tn.getNameAsString(), e);
|
||||
}
|
||||
}
|
||||
|
@ -480,10 +480,12 @@ public class SweepJob {
|
|||
|
||||
@Override
|
||||
public int compareTo(IndexedResult o) {
|
||||
if (this.value == null) {
|
||||
if (this.value == null && o.getValue() == null) {
|
||||
return 0;
|
||||
} else if (o.value == null) {
|
||||
return 1;
|
||||
} else if (this.value == null) {
|
||||
return -1;
|
||||
} else {
|
||||
return this.value.compareTo(o.value);
|
||||
}
|
||||
|
|
|
@ -43,7 +43,11 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
||||
|
@ -51,6 +55,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
|
|||
import org.apache.hadoop.hbase.mob.MobFile;
|
||||
import org.apache.hadoop.hbase.mob.MobFileName;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
|
@ -120,7 +125,7 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
|||
try {
|
||||
admin.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Fail to close the HBaseAdmin", e);
|
||||
LOG.warn("Failed to close the HBaseAdmin", e);
|
||||
}
|
||||
}
|
||||
// disable the block cache.
|
||||
|
@ -138,7 +143,8 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
|||
mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
|
||||
}
|
||||
|
||||
private SweepPartition createPartition(SweepPartitionId id, Context context) throws IOException {
|
||||
private SweepPartition createPartition(CompactionPartitionId id, Context context)
|
||||
throws IOException {
|
||||
return new SweepPartition(id, context);
|
||||
}
|
||||
|
||||
|
@ -161,13 +167,13 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
|||
fout = fs.create(nameFilePath, true);
|
||||
writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class,
|
||||
String.class, CompressionType.NONE, null);
|
||||
SweepPartitionId id;
|
||||
CompactionPartitionId id;
|
||||
SweepPartition partition = null;
|
||||
// the mob files which have the same start key and date are in the same partition.
|
||||
while (context.nextKey()) {
|
||||
Text key = context.getCurrentKey();
|
||||
String keyString = key.toString();
|
||||
id = SweepPartitionId.create(keyString);
|
||||
id = createPartitionId(keyString);
|
||||
if (null == partition || !id.equals(partition.getId())) {
|
||||
// It's the first mob file in the current partition.
|
||||
if (null != partition) {
|
||||
|
@ -215,21 +221,21 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
|||
*/
|
||||
public class SweepPartition {
|
||||
|
||||
private final SweepPartitionId id;
|
||||
private final CompactionPartitionId id;
|
||||
private final Context context;
|
||||
private boolean memstoreUpdated = false;
|
||||
private boolean mergeSmall = false;
|
||||
private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>();
|
||||
private final List<Path> toBeDeleted = new ArrayList<Path>();
|
||||
|
||||
public SweepPartition(SweepPartitionId id, Context context) throws IOException {
|
||||
public SweepPartition(CompactionPartitionId id, Context context) throws IOException {
|
||||
this.id = id;
|
||||
this.context = context;
|
||||
memstore.setPartitionId(id);
|
||||
init();
|
||||
}
|
||||
|
||||
public SweepPartitionId getId() {
|
||||
public CompactionPartitionId getId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
|
@ -294,7 +300,7 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
|||
storeFiles);
|
||||
context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to archive the store files " + storeFiles, e);
|
||||
LOG.error("Failed to archive the store files " + storeFiles, e);
|
||||
}
|
||||
storeFiles.clear();
|
||||
}
|
||||
|
@ -390,58 +396,13 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
|||
}
|
||||
|
||||
/**
|
||||
* The sweep partition id.
|
||||
* It consists of the start key and date.
|
||||
* The start key is a hex string of the checksum of a region start key.
|
||||
* The date is the latest timestamp of cells in a mob file.
|
||||
* Creates the partition id.
|
||||
* @param fileNameAsString The current file name, in string.
|
||||
* @return The partition id.
|
||||
*/
|
||||
public static class SweepPartitionId {
|
||||
private String date;
|
||||
private String startKey;
|
||||
|
||||
public SweepPartitionId(MobFileName fileName) {
|
||||
this.date = fileName.getDate();
|
||||
this.startKey = fileName.getStartKey();
|
||||
}
|
||||
|
||||
public SweepPartitionId(String date, String startKey) {
|
||||
this.date = date;
|
||||
this.startKey = startKey;
|
||||
}
|
||||
|
||||
public static SweepPartitionId create(String key) {
|
||||
return new SweepPartitionId(MobFileName.create(key));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object anObject) {
|
||||
if (this == anObject) {
|
||||
return true;
|
||||
}
|
||||
if (anObject instanceof SweepPartitionId) {
|
||||
SweepPartitionId another = (SweepPartitionId) anObject;
|
||||
if (this.date.equals(another.getDate()) && this.startKey.equals(another.getStartKey())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public String getDate() {
|
||||
return this.date;
|
||||
}
|
||||
|
||||
public String getStartKey() {
|
||||
return this.startKey;
|
||||
}
|
||||
|
||||
public void setDate(String date) {
|
||||
this.date = date;
|
||||
}
|
||||
|
||||
public void setStartKey(String startKey) {
|
||||
this.startKey = startKey;
|
||||
}
|
||||
private CompactionPartitionId createPartitionId(String fileNameAsString) {
|
||||
MobFileName fileName = MobFileName.create(fileNameAsString);
|
||||
return new CompactionPartitionId(fileName.getStartKey(), fileName.getDate());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
|
@ -43,6 +44,7 @@ import com.google.protobuf.ServiceException;
|
|||
* same column family are mutually exclusive too.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class Sweeper extends Configured implements Tool {
|
||||
|
||||
/**
|
||||
|
@ -82,7 +84,7 @@ public class Sweeper extends Configured implements Tool {
|
|||
try {
|
||||
admin.close();
|
||||
} catch (IOException e) {
|
||||
System.out.println("Fail to close the HBaseAdmin: " + e.getMessage());
|
||||
System.out.println("Failed to close the HBaseAdmin: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,10 +85,10 @@ public class HMobStore extends HStore {
|
|||
private MobCacheConfig mobCacheConfig;
|
||||
private Path homePath;
|
||||
private Path mobFamilyPath;
|
||||
private volatile long mobCompactedIntoMobCellsCount = 0;
|
||||
private volatile long mobCompactedFromMobCellsCount = 0;
|
||||
private volatile long mobCompactedIntoMobCellsSize = 0;
|
||||
private volatile long mobCompactedFromMobCellsSize = 0;
|
||||
private volatile long cellsCountCompactedToMob = 0;
|
||||
private volatile long cellsCountCompactedFromMob = 0;
|
||||
private volatile long cellsSizeCompactedToMob = 0;
|
||||
private volatile long cellsSizeCompactedFromMob = 0;
|
||||
private volatile long mobFlushCount = 0;
|
||||
private volatile long mobFlushedCellsCount = 0;
|
||||
private volatile long mobFlushedCellsSize = 0;
|
||||
|
@ -490,36 +490,36 @@ public class HMobStore extends HStore {
|
|||
}
|
||||
}
|
||||
|
||||
public void updateMobCompactedIntoMobCellsCount(long count) {
|
||||
mobCompactedIntoMobCellsCount += count;
|
||||
public void updateCellsCountCompactedToMob(long count) {
|
||||
cellsCountCompactedToMob += count;
|
||||
}
|
||||
|
||||
public long getMobCompactedIntoMobCellsCount() {
|
||||
return mobCompactedIntoMobCellsCount;
|
||||
public long getCellsCountCompactedToMob() {
|
||||
return cellsCountCompactedToMob;
|
||||
}
|
||||
|
||||
public void updateMobCompactedFromMobCellsCount(long count) {
|
||||
mobCompactedFromMobCellsCount += count;
|
||||
public void updateCellsCountCompactedFromMob(long count) {
|
||||
cellsCountCompactedFromMob += count;
|
||||
}
|
||||
|
||||
public long getMobCompactedFromMobCellsCount() {
|
||||
return mobCompactedFromMobCellsCount;
|
||||
public long getCellsCountCompactedFromMob() {
|
||||
return cellsCountCompactedFromMob;
|
||||
}
|
||||
|
||||
public void updateMobCompactedIntoMobCellsSize(long size) {
|
||||
mobCompactedIntoMobCellsSize += size;
|
||||
public void updateCellsSizeCompactedToMob(long size) {
|
||||
cellsSizeCompactedToMob += size;
|
||||
}
|
||||
|
||||
public long getMobCompactedIntoMobCellsSize() {
|
||||
return mobCompactedIntoMobCellsSize;
|
||||
public long getCellsSizeCompactedToMob() {
|
||||
return cellsSizeCompactedToMob;
|
||||
}
|
||||
|
||||
public void updateMobCompactedFromMobCellsSize(long size) {
|
||||
mobCompactedFromMobCellsSize += size;
|
||||
public void updateCellsSizeCompactedFromMob(long size) {
|
||||
cellsSizeCompactedFromMob += size;
|
||||
}
|
||||
|
||||
public long getMobCompactedFromMobCellsSize() {
|
||||
return mobCompactedFromMobCellsSize;
|
||||
public long getCellsSizeCompactedFromMob() {
|
||||
return cellsSizeCompactedFromMob;
|
||||
}
|
||||
|
||||
public void updateMobFlushCount() {
|
||||
|
|
|
@ -82,10 +82,10 @@ class MetricsRegionServerWrapperImpl
|
|||
private volatile long flushedCellsSize = 0;
|
||||
private volatile long compactedCellsSize = 0;
|
||||
private volatile long majorCompactedCellsSize = 0;
|
||||
private volatile long mobCompactedIntoMobCellsCount = 0;
|
||||
private volatile long mobCompactedFromMobCellsCount = 0;
|
||||
private volatile long mobCompactedIntoMobCellsSize = 0;
|
||||
private volatile long mobCompactedFromMobCellsSize = 0;
|
||||
private volatile long cellsCountCompactedToMob = 0;
|
||||
private volatile long cellsCountCompactedFromMob = 0;
|
||||
private volatile long cellsSizeCompactedToMob = 0;
|
||||
private volatile long cellsSizeCompactedFromMob = 0;
|
||||
private volatile long mobFlushCount = 0;
|
||||
private volatile long mobFlushedCellsCount = 0;
|
||||
private volatile long mobFlushedCellsSize = 0;
|
||||
|
@ -449,23 +449,23 @@ class MetricsRegionServerWrapperImpl
|
|||
}
|
||||
|
||||
@Override
|
||||
public long getMobCompactedFromMobCellsCount() {
|
||||
return mobCompactedFromMobCellsCount;
|
||||
public long getCellsCountCompactedFromMob() {
|
||||
return cellsCountCompactedFromMob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMobCompactedIntoMobCellsCount() {
|
||||
return mobCompactedIntoMobCellsCount;
|
||||
public long getCellsCountCompactedToMob() {
|
||||
return cellsCountCompactedToMob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMobCompactedFromMobCellsSize() {
|
||||
return mobCompactedFromMobCellsSize;
|
||||
public long getCellsSizeCompactedFromMob() {
|
||||
return cellsSizeCompactedFromMob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMobCompactedIntoMobCellsSize() {
|
||||
return mobCompactedIntoMobCellsSize;
|
||||
public long getCellsSizeCompactedToMob() {
|
||||
return cellsSizeCompactedToMob;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -560,10 +560,10 @@ class MetricsRegionServerWrapperImpl
|
|||
long tempFlushedCellsSize = 0;
|
||||
long tempCompactedCellsSize = 0;
|
||||
long tempMajorCompactedCellsSize = 0;
|
||||
long tempMobCompactedIntoMobCellsCount = 0;
|
||||
long tempMobCompactedFromMobCellsCount = 0;
|
||||
long tempMobCompactedIntoMobCellsSize = 0;
|
||||
long tempMobCompactedFromMobCellsSize = 0;
|
||||
long tempCellsCountCompactedToMob = 0;
|
||||
long tempCellsCountCompactedFromMob = 0;
|
||||
long tempCellsSizeCompactedToMob = 0;
|
||||
long tempCellsSizeCompactedFromMob = 0;
|
||||
long tempMobFlushCount = 0;
|
||||
long tempMobFlushedCellsCount = 0;
|
||||
long tempMobFlushedCellsSize = 0;
|
||||
|
@ -596,10 +596,10 @@ class MetricsRegionServerWrapperImpl
|
|||
tempMajorCompactedCellsSize += store.getMajorCompactedCellsSize();
|
||||
if (store instanceof HMobStore) {
|
||||
HMobStore mobStore = (HMobStore) store;
|
||||
tempMobCompactedIntoMobCellsCount += mobStore.getMobCompactedIntoMobCellsCount();
|
||||
tempMobCompactedFromMobCellsCount += mobStore.getMobCompactedFromMobCellsCount();
|
||||
tempMobCompactedIntoMobCellsSize += mobStore.getMobCompactedIntoMobCellsSize();
|
||||
tempMobCompactedFromMobCellsSize += mobStore.getMobCompactedFromMobCellsSize();
|
||||
tempCellsCountCompactedToMob += mobStore.getCellsCountCompactedToMob();
|
||||
tempCellsCountCompactedFromMob += mobStore.getCellsCountCompactedFromMob();
|
||||
tempCellsSizeCompactedToMob += mobStore.getCellsSizeCompactedToMob();
|
||||
tempCellsSizeCompactedFromMob += mobStore.getCellsSizeCompactedFromMob();
|
||||
tempMobFlushCount += mobStore.getMobFlushCount();
|
||||
tempMobFlushedCellsCount += mobStore.getMobFlushedCellsCount();
|
||||
tempMobFlushedCellsSize += mobStore.getMobFlushedCellsSize();
|
||||
|
@ -666,10 +666,10 @@ class MetricsRegionServerWrapperImpl
|
|||
flushedCellsSize = tempFlushedCellsSize;
|
||||
compactedCellsSize = tempCompactedCellsSize;
|
||||
majorCompactedCellsSize = tempMajorCompactedCellsSize;
|
||||
mobCompactedIntoMobCellsCount = tempMobCompactedIntoMobCellsCount;
|
||||
mobCompactedFromMobCellsCount = tempMobCompactedFromMobCellsCount;
|
||||
mobCompactedIntoMobCellsSize = tempMobCompactedIntoMobCellsSize;
|
||||
mobCompactedFromMobCellsSize = tempMobCompactedFromMobCellsSize;
|
||||
cellsCountCompactedToMob = tempCellsCountCompactedToMob;
|
||||
cellsCountCompactedFromMob = tempCellsCountCompactedFromMob;
|
||||
cellsSizeCompactedToMob = tempCellsSizeCompactedToMob;
|
||||
cellsSizeCompactedFromMob = tempCellsSizeCompactedFromMob;
|
||||
mobFlushCount = tempMobFlushCount;
|
||||
mobFlushedCellsCount = tempMobFlushedCellsCount;
|
||||
mobFlushedCellsSize = tempMobFlushedCellsSize;
|
||||
|
|
|
@ -49,7 +49,7 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
|
|||
}
|
||||
|
||||
/**
|
||||
* Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
|
||||
* Firstly reads the cells from the HBase. If the cell is a reference cell (which has the
|
||||
* reference tag), the scanner need seek this cell from the mob file, and use the cell found
|
||||
* from the mob file as the result.
|
||||
*/
|
||||
|
|
|
@ -58,7 +58,7 @@ public class StoreFileInfo {
|
|||
Pattern.compile("^(" + HFILE_NAME_REGEX + ")");
|
||||
|
||||
/**
|
||||
* A non-capture group, for hfiles, so that this can be embedded.
|
||||
* A non-capture group, for del files, so that this can be embedded.
|
||||
* A del file has (_del) as suffix.
|
||||
*/
|
||||
public static final String DELFILE_NAME_REGEX = "[0-9a-f]+(?:_del)";
|
||||
|
|
|
@ -159,12 +159,12 @@ public class SnapshotManifest {
|
|||
RegionVisitor visitor = createRegionVisitor(desc);
|
||||
|
||||
// 1. dump region meta info into the snapshot directory
|
||||
LOG.debug("Storing '" + regionInfo + "' region-info for snapshot.");
|
||||
LOG.debug("Storing mob region '" + regionInfo + "' region-info for snapshot.");
|
||||
Object regionData = visitor.regionOpen(regionInfo);
|
||||
monitor.rethrowException();
|
||||
|
||||
// 2. iterate through all the stores in the region
|
||||
LOG.debug("Creating references for hfiles");
|
||||
LOG.debug("Creating references for mob files");
|
||||
|
||||
Path mobRegionPath = MobUtils.getMobRegionPath(conf, regionInfo.getTable());
|
||||
for (HColumnDescriptor hcd : hcds) {
|
||||
|
@ -188,7 +188,7 @@ public class SnapshotManifest {
|
|||
storeFiles.add(new StoreFileInfo(conf, fs, stat));
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adding snapshot references for " + storeFiles + " hfiles");
|
||||
LOG.debug("Adding snapshot references for " + storeFiles + " mob files");
|
||||
}
|
||||
|
||||
// 2.2. iterate through all the mob files and create "references".
|
||||
|
@ -198,7 +198,7 @@ public class SnapshotManifest {
|
|||
|
||||
// create "reference" to this store file.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adding reference for file (" + (i + 1) + "/" + sz + "): "
|
||||
LOG.debug("Adding reference for mob file (" + (i + 1) + "/" + sz + "): "
|
||||
+ storeFile.getPath());
|
||||
}
|
||||
visitor.storeFile(regionData, familyData, storeFile);
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob.filecompactions;
|
||||
package org.apache.hadoop.hbase.mob.compactions;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
@ -86,7 +88,7 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestMobFileCompactor {
|
||||
public class TestMobCompactor {
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private Configuration conf = null;
|
||||
private String tableNameAsString;
|
||||
|
@ -116,7 +118,7 @@ public class TestMobFileCompactor {
|
|||
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
|
||||
TEST_UTIL.getConfiguration()
|
||||
.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, 5000);
|
||||
.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 5000);
|
||||
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
|
||||
KeyProviderForTesting.class.getName());
|
||||
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||
|
@ -199,7 +201,7 @@ public class TestMobFileCompactor {
|
|||
countFiles(tableName, true, family1));
|
||||
assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1));
|
||||
|
||||
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
|
||||
MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
|
||||
compactor.compact();
|
||||
|
||||
assertEquals("After compaction: mob rows count", regionNum * rowNumPerRegion,
|
||||
|
@ -228,7 +230,7 @@ public class TestMobFileCompactor {
|
|||
countFiles(tableName, true, family1));
|
||||
assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1));
|
||||
|
||||
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
|
||||
MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
|
||||
compactor.compact();
|
||||
|
||||
assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
|
||||
|
@ -278,7 +280,7 @@ public class TestMobFileCompactor {
|
|||
countFiles(tableName, true, family1));
|
||||
assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1));
|
||||
|
||||
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd, pool);
|
||||
MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd, pool);
|
||||
compactor.compact();
|
||||
|
||||
assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
|
||||
|
@ -322,7 +324,7 @@ public class TestMobFileCompactor {
|
|||
countFiles(tableName, false, family2));
|
||||
|
||||
// do the mob file compaction
|
||||
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
|
||||
MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
|
||||
compactor.compact();
|
||||
|
||||
assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
|
||||
|
@ -345,7 +347,7 @@ public class TestMobFileCompactor {
|
|||
resetConf();
|
||||
int mergeSize = 5000;
|
||||
// change the mob compaction merge size
|
||||
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
|
||||
|
||||
int count = 4;
|
||||
// generate mob files
|
||||
|
@ -376,7 +378,7 @@ public class TestMobFileCompactor {
|
|||
countFiles(tableName, false, family2));
|
||||
|
||||
// do the mob file compaction
|
||||
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
|
||||
MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
|
||||
compactor.compact();
|
||||
|
||||
assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
|
||||
|
@ -399,7 +401,7 @@ public class TestMobFileCompactor {
|
|||
public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() throws Exception {
|
||||
resetConf();
|
||||
int batchSize = 2;
|
||||
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, batchSize);
|
||||
conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, batchSize);
|
||||
int count = 4;
|
||||
// generate mob files
|
||||
loadData(admin, bufMut, tableName, count, rowNumPerFile);
|
||||
|
@ -427,8 +429,8 @@ public class TestMobFileCompactor {
|
|||
assertEquals("Before compaction: family2 del file count", regionNum,
|
||||
countFiles(tableName, false, family2));
|
||||
|
||||
// do the mob file compaction
|
||||
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
|
||||
// do the mob compaction
|
||||
MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
|
||||
compactor.compact();
|
||||
|
||||
assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
|
||||
|
@ -473,8 +475,8 @@ public class TestMobFileCompactor {
|
|||
assertEquals("Before compaction: family2 del file count", regionNum,
|
||||
countFiles(tableName, false, family2));
|
||||
|
||||
// do the mob file compaction
|
||||
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1, pool);
|
||||
// do the mob compaction
|
||||
MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
|
||||
compactor.compact();
|
||||
|
||||
assertEquals("After first compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
|
||||
|
@ -565,7 +567,7 @@ public class TestMobFileCompactor {
|
|||
countFiles(tableName, false, family2));
|
||||
|
||||
int largeFilesCount = countLargeFiles(5000, family1);
|
||||
// do the mob file compaction
|
||||
// do the mob compaction
|
||||
admin.compactMob(tableName, hcd1.getName());
|
||||
|
||||
waitUntilCompactionFinished(tableName);
|
||||
|
@ -613,7 +615,7 @@ public class TestMobFileCompactor {
|
|||
assertEquals("Before compaction: family2 del file count", regionNum,
|
||||
countFiles(tableName, false, family2));
|
||||
|
||||
// do the major mob file compaction, it will force all files to compaction
|
||||
// do the major mob compaction, it will force all files to compaction
|
||||
admin.majorCompactMob(tableName, hcd1.getName());
|
||||
|
||||
waitUntilCompactionFinished(tableName);
|
||||
|
@ -914,9 +916,9 @@ public class TestMobFileCompactor {
|
|||
* Resets the configuration.
|
||||
*/
|
||||
private void resetConf() {
|
||||
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
|
||||
MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
|
||||
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
|
||||
MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
|
||||
MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
|
||||
conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
|
||||
MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
|
||||
}
|
||||
}
|
|
@ -16,19 +16,19 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob.filecompactions;
|
||||
package org.apache.hadoop.hbase.mob.compactions;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
|
||||
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestPartitionedMobFileCompactionRequest {
|
||||
public class TestPartitionedMobCompactionRequest {
|
||||
|
||||
@Test
|
||||
public void testCompactedPartitionId() {
|
|
@ -16,7 +16,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob.filecompactions;
|
||||
package org.apache.hadoop.hbase.mob.compactions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -48,8 +48,10 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
|||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobFileName;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
|
||||
import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
|
||||
import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
|
||||
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -60,7 +62,7 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestPartitionedMobFileCompactor {
|
||||
public class TestPartitionedMobCompactor {
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private final static String family = "family";
|
||||
private final static String qf = "qf";
|
||||
|
@ -111,7 +113,7 @@ public class TestPartitionedMobFileCompactor {
|
|||
// create 10 del files
|
||||
createStoreFiles(basePath, family, qf, count, Type.Delete);
|
||||
listFiles();
|
||||
long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD;
|
||||
long mergeSize = MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD;
|
||||
List<String> expectedStartKeys = new ArrayList<>();
|
||||
for(FileStatus file : mobFiles) {
|
||||
if(file.getLen() < mergeSize) {
|
||||
|
@ -143,8 +145,8 @@ public class TestPartitionedMobFileCompactor {
|
|||
expectedStartKeys.add(startKey);
|
||||
}
|
||||
}
|
||||
// set the mob file compaction mergeable threshold
|
||||
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
|
||||
// set the mob compaction mergeable threshold
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
|
||||
testSelectFiles(tableName, CompactionType.PART_FILES, false, expectedStartKeys);
|
||||
}
|
||||
|
||||
|
@ -166,8 +168,8 @@ public class TestPartitionedMobFileCompactor {
|
|||
String startKey = fileName.substring(0, 32);
|
||||
expectedStartKeys.add(startKey);
|
||||
}
|
||||
// set the mob file compaction mergeable threshold
|
||||
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
|
||||
// set the mob compaction mergeable threshold
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
|
||||
testSelectFiles(tableName, CompactionType.ALL_FILES, true, expectedStartKeys);
|
||||
}
|
||||
|
||||
|
@ -195,8 +197,8 @@ public class TestPartitionedMobFileCompactor {
|
|||
createStoreFiles(basePath, family, qf, 13, Type.Delete);
|
||||
listFiles();
|
||||
|
||||
// set the mob file compaction batch size
|
||||
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4);
|
||||
// set the mob compaction batch size
|
||||
conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 4);
|
||||
testCompactDelFiles(tableName, 1, 13, false);
|
||||
}
|
||||
|
||||
|
@ -213,8 +215,8 @@ public class TestPartitionedMobFileCompactor {
|
|||
|
||||
// set the max del file count
|
||||
conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5);
|
||||
// set the mob file compaction batch size
|
||||
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2);
|
||||
// set the mob compaction batch size
|
||||
conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 2);
|
||||
testCompactDelFiles(tableName, 4, 13, false);
|
||||
}
|
||||
|
||||
|
@ -222,11 +224,12 @@ public class TestPartitionedMobFileCompactor {
|
|||
* Tests the selectFiles
|
||||
* @param tableName the table name
|
||||
* @param type the expected compaction type
|
||||
* @param isForceAllFiles whether all the mob files are selected
|
||||
* @param expected the expected start keys
|
||||
*/
|
||||
private void testSelectFiles(String tableName, final CompactionType type,
|
||||
final boolean isForceAllFiles, final List<String> expected) throws IOException {
|
||||
PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
|
||||
PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
|
||||
TableName.valueOf(tableName), hcd, pool) {
|
||||
@Override
|
||||
public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
|
||||
|
@ -234,7 +237,7 @@ public class TestPartitionedMobFileCompactor {
|
|||
if (files == null || files.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles);
|
||||
PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
|
||||
// assert the compaction type
|
||||
Assert.assertEquals(type, request.type);
|
||||
// assert get the right partitions
|
||||
|
@ -252,13 +255,14 @@ public class TestPartitionedMobFileCompactor {
|
|||
* @param tableName the table name
|
||||
* @param expectedFileCount the expected file count
|
||||
* @param expectedCellCount the expected cell count
|
||||
* @param isForceAllFiles whether all the mob files are selected
|
||||
*/
|
||||
private void testCompactDelFiles(String tableName, final int expectedFileCount,
|
||||
final int expectedCellCount, boolean isForceAllFiles) throws IOException {
|
||||
PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
|
||||
PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
|
||||
TableName.valueOf(tableName), hcd, pool) {
|
||||
@Override
|
||||
protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
|
||||
protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
|
||||
throws IOException {
|
||||
List<Path> delFilePaths = new ArrayList<Path>();
|
||||
for (FileStatus delFile : request.delFiles) {
|
||||
|
@ -427,10 +431,10 @@ public class TestPartitionedMobFileCompactor {
|
|||
* Resets the configuration.
|
||||
*/
|
||||
private void resetConf() {
|
||||
conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
|
||||
MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
|
||||
MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
|
||||
conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
|
||||
conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
|
||||
MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
|
||||
conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
|
||||
MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
|
||||
}
|
||||
}
|
|
@ -267,22 +267,22 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
|
|||
}
|
||||
|
||||
@Override
|
||||
public long getMobCompactedIntoMobCellsCount() {
|
||||
public long getCellsCountCompactedToMob() {
|
||||
return 20;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMobCompactedFromMobCellsCount() {
|
||||
public long getCellsCountCompactedFromMob() {
|
||||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMobCompactedIntoMobCellsSize() {
|
||||
public long getCellsSizeCompactedToMob() {
|
||||
return 200;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMobCompactedFromMobCellsSize() {
|
||||
public long getCellsSizeCompactedFromMob() {
|
||||
return 100;
|
||||
}
|
||||
|
||||
|
|
|
@ -62,13 +62,13 @@ import org.junit.experimental.categories.Category;
|
|||
import org.junit.rules.TestName;
|
||||
|
||||
/**
|
||||
* Test mob compaction
|
||||
* Test mob store compaction
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestMobCompaction {
|
||||
public class TestMobStoreCompaction {
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName());
|
||||
static final Log LOG = LogFactory.getLog(TestMobStoreCompaction.class.getName());
|
||||
private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private Configuration conf = null;
|
||||
|
||||
|
@ -178,8 +178,8 @@ public class TestMobCompaction {
|
|||
}
|
||||
|
||||
/**
|
||||
* This test will first generate store files, then bulk load them and trigger the compaction. When
|
||||
* compaction, the cell value will be larger than the threshold.
|
||||
* This test will first generate store files, then bulk load them and trigger the compaction.
|
||||
* When compaction, the cell value will be larger than the threshold.
|
||||
*/
|
||||
@Test
|
||||
public void testMobCompactionWithBulkload() throws Exception {
|
|
@ -998,7 +998,7 @@ module Hbase
|
|||
# Requests a mob file compaction
|
||||
def compact_mob(table_name, family = nil)
|
||||
if family == nil
|
||||
@admin.compactMob(org.apache.hadoop.hbase.TableName.valueOf(table_name))
|
||||
@admin.compactMobs(org.apache.hadoop.hbase.TableName.valueOf(table_name))
|
||||
else
|
||||
# We are compacting a mob column family within a table.
|
||||
@admin.compactMob(org.apache.hadoop.hbase.TableName.valueOf(table_name), family.to_java_bytes)
|
||||
|
@ -1009,7 +1009,7 @@ module Hbase
|
|||
# Requests a mob file major compaction
|
||||
def major_compact_mob(table_name, family = nil)
|
||||
if family == nil
|
||||
@admin.majorCompactMob(org.apache.hadoop.hbase.TableName.valueOf(table_name))
|
||||
@admin.majorCompactMobs(org.apache.hadoop.hbase.TableName.valueOf(table_name))
|
||||
else
|
||||
# We are major compacting a mob column family within a table.
|
||||
@admin.majorCompactMob(org.apache.hadoop.hbase.TableName.valueOf(table_name), family.to_java_bytes)
|
||||
|
|
Loading…
Reference in New Issue