HBASE-1476 Multithreaded Compactions

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1101677 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nicolas Spiegelberg 2011-05-10 23:21:04 +00:00
parent 1f4eb71478
commit 770b745807
8 changed files with 304 additions and 158 deletions

View File

@ -212,6 +212,8 @@ Release 0.91.0 - Unreleased
HBASE-3721 Speedup LoadIncrementalHFiles (Ted Yu) HBASE-3721 Speedup LoadIncrementalHFiles (Ted Yu)
HBASE-3855 Performance degradation of memstore because reseek is linear HBASE-3855 Performance degradation of memstore because reseek is linear
(dhruba borthakur) (dhruba borthakur)
HBASE-3797 StoreFile Level Compaction Locking
HBASE-1476 Multithreaded Compactions
TASKS TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -19,34 +19,32 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.util.concurrent.Executors;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Bytes; import com.google.common.base.Preconditions;
import org.apache.hadoop.util.StringUtils;
/** /**
* Compact region on request and then run split if appropriate * Compact region on request and then run split if appropriate
*/ */
public class CompactSplitThread extends Thread implements CompactionRequestor { public class CompactSplitThread implements CompactionRequestor {
static final Log LOG = LogFactory.getLog(CompactSplitThread.class); static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
private final long frequency;
private final ReentrantLock lock = new ReentrantLock();
private final HRegionServer server; private final HRegionServer server;
private final Configuration conf; private final Configuration conf;
protected final BlockingQueue<CompactionRequest> compactionQueue = private final ThreadPoolExecutor largeCompactions;
new PriorityBlockingQueue<CompactionRequest>(); private final ThreadPoolExecutor smallCompactions;
private final ThreadPoolExecutor splits;
private final long throttleSize;
/* The default priority for user-specified compaction requests. /* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0) * The user gets top priority unless we have blocking compactions. (Pri <= 0)
@ -62,85 +60,71 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
private int regionSplitLimit; private int regionSplitLimit;
/** @param server */ /** @param server */
public CompactSplitThread(HRegionServer server) { CompactSplitThread(HRegionServer server) {
super(); super();
this.server = server; this.server = server;
this.conf = server.getConfiguration(); this.conf = server.getConfiguration();
this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit", this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit",
Integer.MAX_VALUE); Integer.MAX_VALUE);
this.frequency =
conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", int largeThreads = Math.max(1, conf.getInt(
20 * 1000); "hbase.regionserver.thread.compaction.large", 1));
int smallThreads = conf.getInt(
"hbase.regionserver.thread.compaction.small", 0);
throttleSize = conf.getLong(
"hbase.regionserver.thread.compaction.throttle", 0);
int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
// if we have throttle threads, make sure the user also specified size
Preconditions.checkArgument(smallThreads == 0 || throttleSize > 0);
this.largeCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
this.largeCompactions
.setRejectedExecutionHandler(new CompactionRequest.Rejection());
if (smallThreads <= 0) {
this.smallCompactions = null;
} else {
this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
this.smallCompactions
.setRejectedExecutionHandler(new CompactionRequest.Rejection());
}
this.splits = (ThreadPoolExecutor) Executors
.newFixedThreadPool(splitThreads);
} }
@Override @Override
public void run() { public String toString() {
while (!this.server.isStopped()) { return "compaction_queue="
CompactionRequest compactionRequest = null; + (smallCompactions != null ? "("
HRegion r = null; + largeCompactions.getQueue().size() + ":"
boolean completed = false; + smallCompactions.getQueue().size() + ")"
try { : largeCompactions.getQueue().size())
compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); + ", split_queue=" + splits.getQueue().size();
if (compactionRequest != null) {
r = compactionRequest.getHRegion();
lock.lock();
try {
// look for a split first
if(!this.server.isStopped()) {
// don't split regions that are blocking
if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) {
byte[] midkey = compactionRequest.getStore().checkSplit();
if (midkey != null) {
split(r, midkey);
continue;
}
}
} }
// now test for compaction public synchronized boolean requestSplit(final HRegion r) {
if(!this.server.isStopped()) { // don't split regions that are blocking
long startTime = EnvironmentEdgeManager.currentTimeMillis(); if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) {
completed = r.compact(compactionRequest); byte[] midKey = r.checkSplit();
long now = EnvironmentEdgeManager.currentTimeMillis(); if (midKey != null) {
LOG.info(((completed) ? "completed" : "aborted") requestSplit(r, midKey);
+ " compaction: " + compactionRequest + ", duration=" return true;
+ StringUtils.formatTimeDiff(now, startTime));
if (completed) { // compaction aborted?
this.server.getMetrics().
addCompaction(now - startTime, compactionRequest.getSize());
} }
} }
} finally { return false;
lock.unlock();
} }
public synchronized void requestSplit(final HRegion r, byte[] midKey) {
try {
this.splits.execute(new SplitRequest(r, midKey, this.server));
if (LOG.isDebugEnabled()) {
LOG.debug("Split requested for " + r + ". " + this);
} }
} catch (InterruptedException ex) { } catch (RejectedExecutionException ree) {
continue; LOG.info("Could not execute split for " + r, ree);
} catch (IOException ex) {
LOG.error("Compaction/Split failed " + compactionRequest,
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
break;
} }
} catch (Exception ex) {
LOG.error("Compaction failed " + compactionRequest, ex);
if (!server.checkFileSystem()) {
break;
}
} finally {
if (compactionRequest != null) {
Store s = compactionRequest.getStore();
s.finishRequest(compactionRequest);
// degenerate case: blocked regions require recursive enqueues
if (s.getCompactPriority() < PRIORITY_USER && completed) {
requestCompaction(r, s, "Recursive enqueue");
}
}
compactionRequest = null;
}
}
compactionQueue.clear();
LOG.info(getName() + " exiting");
} }
public synchronized void requestCompaction(final HRegion r, public synchronized void requestCompaction(final HRegion r,
@ -164,7 +148,7 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
/** /**
* @param r HRegion store belongs to * @param r HRegion store belongs to
* @param force Whether next compaction should be major * @param s Store to request compaction on
* @param why Why compaction requested -- used in debug messages * @param why Why compaction requested -- used in debug messages
* @param priority override the default priority (NO_PRIORITY == decide) * @param priority override the default priority (NO_PRIORITY == decide)
*/ */
@ -175,64 +159,55 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
} }
CompactionRequest cr = s.requestCompaction(); CompactionRequest cr = s.requestCompaction();
if (cr != null) { if (cr != null) {
cr.setServer(server);
if (priority != NO_PRIORITY) { if (priority != NO_PRIORITY) {
cr.setPriority(priority); cr.setPriority(priority);
} }
boolean addedToQueue = compactionQueue.add(cr); ThreadPoolExecutor pool = largeCompactions;
if (!addedToQueue) { if (smallCompactions != null && throttleSize > cr.getSize()) {
LOG.error("Could not add request to compaction queue: " + cr); // smallCompactions is like the 10 items or less line at Walmart
s.finishRequest(cr); pool = smallCompactions;
} else if (LOG.isDebugEnabled()) { }
LOG.debug("Compaction requested: " + cr pool.execute(cr);
if (LOG.isDebugEnabled()) {
String type = "";
if (smallCompactions != null) {
type = (pool == smallCompactions) ? "Small " : "Large ";
}
LOG.debug(type + "Compaction requested: " + cr
+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + (why != null && !why.isEmpty() ? "; Because: " + why : "")
+ "; Priority: " + priority + "; Compaction queue size: " + "; " + this);
+ compactionQueue.size());
} }
} }
} }
private void split(final HRegion parent, final byte [] midKey)
throws IOException {
final long startTime = System.currentTimeMillis();
SplitTransaction st = new SplitTransaction(parent, midKey);
// If prepare does not return true, for some reason -- logged inside in
// the prepare call -- we are not ready to split just now. Just return.
if (!st.prepare()) return;
try {
st.execute(this.server, this.server);
} catch (Exception e) {
try {
LOG.info("Running rollback of failed split of " +
parent.getRegionNameAsString() + "; " + e.getMessage());
st.rollback(this.server, this.server);
LOG.info("Successful rollback of failed split of " +
parent.getRegionNameAsString());
} catch (Exception ee) {
// If failed rollback, kill this server to avoid having a hole in table.
LOG.info("Failed rollback of failed split of " +
parent.getRegionNameAsString() + " -- aborting server", ee);
this.server.abort("Failed split");
}
return;
}
LOG.info("Region split, META updated, and report to master. Parent=" +
parent.getRegionInfo().getRegionNameAsString() + ", new regions: " +
st.getFirstDaughter().getRegionNameAsString() + ", " +
st.getSecondDaughter().getRegionNameAsString() + ". Split took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
}
/** /**
* Only interrupt once it's done with a run through the work loop. * Only interrupt once it's done with a run through the work loop.
*/ */
void interruptIfNecessary() { void interruptIfNecessary() {
if (lock.tryLock()) { splits.shutdown();
try { largeCompactions.shutdown();
this.interrupt(); if (smallCompactions != null)
} finally { smallCompactions.shutdown();
lock.unlock();
} }
private void waitFor(ThreadPoolExecutor t, String name) {
boolean done = false;
while (!done) {
try {
done = t.awaitTermination(60, TimeUnit.SECONDS);
LOG.debug("Waiting for " + name + " to finish...");
} catch (InterruptedException ie) {
LOG.debug("Interrupted waiting for " + name + " to finish...");
}
}
}
void join() {
waitFor(splits, "Split Thread");
waitFor(largeCompactions, "Large Compaction Thread");
if (smallCompactions != null) {
waitFor(smallCompactions, "Small Compaction Thread");
} }
} }
@ -243,7 +218,10 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
* @return The current size of the regions queue. * @return The current size of the regions queue.
*/ */
public int getCompactionQueueSize() { public int getCompactionQueueSize() {
return compactionQueue.size(); int size = largeCompactions.getQueue().size();
if (smallCompactions != null)
size += smallCompactions.getQueue().size();
return size;
} }
private boolean shouldSplitRegion() { private boolean shouldSplitRegion() {

View File

@ -890,7 +890,8 @@ public class HRegion implements HeapSize { // , Writable{
return false; return false;
} }
} }
LOG.info("Starting compaction on region " + this); LOG.info("Starting compaction on " + cr.getStore() + " in region "
+ this);
doRegionCompactionPrep(); doRegionCompactionPrep();
try { try {
status.setStatus("Compacting store " + cr.getStore()); status.setStatus("Compacting store " + cr.getStore());
@ -3707,6 +3708,20 @@ public class HRegion implements HeapSize { // , Writable{
// nothing // nothing
} }
byte[] checkSplit() {
if (this.splitPoint != null) {
return this.splitPoint;
}
byte[] splitPoint = null;
for (Store s : stores.values()) {
splitPoint = s.checkSplit();
if (splitPoint != null) {
return splitPoint;
}
}
return null;
}
/** /**
* @return The priority that this region should have in the compaction queue * @return The priority that this region should have in the compaction queue
*/ */

View File

@ -226,7 +226,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
private RegionServerMetrics metrics; private RegionServerMetrics metrics;
// Compactions // Compactions
CompactSplitThread compactSplitThread; public CompactSplitThread compactSplitThread;
// Cache flushing // Cache flushing
MemStoreFlusher cacheFlusher; MemStoreFlusher cacheFlusher;
@ -1017,7 +1017,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
* *
* @return false if file system is not available * @return false if file system is not available
*/ */
protected boolean checkFileSystem() { public boolean checkFileSystem() {
if (this.fsOk && this.fs != null) { if (this.fsOk && this.fs != null) {
try { try {
FSUtils.checkFileSystemAvailable(this.fs); FSUtils.checkFileSystemAvailable(this.fs);
@ -1247,8 +1247,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler); Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler);
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
handler); handler);
Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
handler);
Threads.setDaemonThreadRunning(this.majorCompactionChecker, n + Threads.setDaemonThreadRunning(this.majorCompactionChecker, n +
".majorCompactionChecker", handler); ".majorCompactionChecker", handler);
@ -1316,7 +1314,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
return false; return false;
} }
// Verify that all threads are alive // Verify that all threads are alive
if (!(leases.isAlive() && compactSplitThread.isAlive() if (!(leases.isAlive()
&& cacheFlusher.isAlive() && hlogRoller.isAlive() && cacheFlusher.isAlive() && hlogRoller.isAlive()
&& this.majorCompactionChecker.isAlive())) { && this.majorCompactionChecker.isAlive())) {
stop("One or more threads are no longer alive -- stop"); stop("One or more threads are no longer alive -- stop");
@ -1434,8 +1432,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
protected void join() { protected void join() {
Threads.shutdown(this.majorCompactionChecker); Threads.shutdown(this.majorCompactionChecker);
Threads.shutdown(this.cacheFlusher); Threads.shutdown(this.cacheFlusher);
Threads.shutdown(this.compactSplitThread);
Threads.shutdown(this.hlogRoller); Threads.shutdown(this.hlogRoller);
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
}
if (this.service != null) this.service.shutdown(); if (this.service != null) this.service.shutdown();
if (this.replicationHandler != null) { if (this.replicationHandler != null) {
this.replicationHandler.join(); this.replicationHandler.join();
@ -2338,11 +2338,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
HRegion region = getRegion(regionInfo.getRegionName()); HRegion region = getRegion(regionInfo.getRegionName());
region.flushcache(); region.flushcache();
region.forceSplit(splitPoint); region.forceSplit(splitPoint);
// force a compaction, split will be side-effect compactSplitThread.requestSplit(region, region.checkSplit());
// TODO: flush/compact/split refactor will make it trivial to do this
// sync/async (and won't require us to do a compaction to split!)
compactSplitThread.requestCompaction(region, "User-triggered split",
CompactSplitThread.PRIORITY_USER);
} }
@Override @Override

View File

@ -354,7 +354,9 @@ class MemStoreFlusher extends Thread implements FlushRequester {
LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
"store files; delaying flush up to " + this.blockingWaitTime + "ms"); "store files; delaying flush up to " + this.blockingWaitTime + "ms");
} }
if (!this.server.compactSplitThread.requestSplit(region)) {
this.server.compactSplitThread.requestCompaction(region, getName()); this.server.compactSplitThread.requestCompaction(region, getName());
}
// Put back on the queue. Have it come back out of the queue // Put back on the queue. Have it come back out of the queue
// after a delay of this.blockingWaitTime / 100 ms. // after a delay of this.blockingWaitTime / 100 ms.
this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));

View File

@ -0,0 +1,76 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
/**
* Handles processing region splits. Put in a queue, owned by HRegionServer.
*/
class SplitRequest implements Runnable {
static final Log LOG = LogFactory.getLog(SplitRequest.class);
private final HRegion parent;
private final byte[] midKey;
private final HRegionServer server;
SplitRequest(HRegion region, byte[] midKey, HRegionServer hrs) {
Preconditions.checkNotNull(hrs);
this.parent = region;
this.midKey = midKey;
this.server = hrs;
}
@Override
public String toString() {
return "regionName=" + parent + ", midKey=" + Bytes.toStringBinary(midKey);
}
@Override
public void run() {
try {
final long startTime = System.currentTimeMillis();
SplitTransaction st = new SplitTransaction(parent, midKey);
// If prepare does not return true, for some reason -- logged inside in
// the prepare call -- we are not ready to split just now. Just return.
if (!st.prepare()) return;
try {
st.execute(this.server, this.server);
} catch (Exception e) {
try {
LOG.info("Running rollback of failed split of " + parent + "; "
+ e.getMessage());
st.rollback(this.server, this.server);
LOG.info("Successful rollback of failed split of " + parent);
} catch (RuntimeException ee) {
// If failed rollback, kill server to avoid having a hole in table.
LOG.info("Failed rollback of failed split of "
+ parent.getRegionNameAsString() + " -- aborting server", ee);
this.server.abort("Failed split");
}
return;
}
LOG.info("Region split, META updated, and report to master. Parent="
+ parent.getRegionInfo().getRegionNameAsString() + ", new regions: "
+ st.getFirstDaughter().getRegionNameAsString() + ", "
+ st.getSecondDaughter().getRegionNameAsString() + ". Split took "
+ StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
} catch (IOException ex) {
LOG.error("Split failed " + this, RemoteExceptionHandler
.checkIOException(ex));
server.checkFileSystem();
}
}
}

View File

@ -695,7 +695,7 @@ public class Store implements HeapSize {
StoreFile last = filesCompacting.get(filesCompacting.size() - 1); StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = filesToCompact.indexOf(last); int idx = filesToCompact.indexOf(last);
Preconditions.checkArgument(idx != -1); Preconditions.checkArgument(idx != -1);
filesToCompact = filesToCompact.subList(idx+1, filesToCompact.size()); filesToCompact.subList(0, idx + 1).clear();
} }
int count = filesToCompact.size(); int count = filesToCompact.size();
if (N > count) { if (N > count) {
@ -868,7 +868,7 @@ public class Store implements HeapSize {
StoreFile last = filesCompacting.get(filesCompacting.size() - 1); StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = candidates.indexOf(last); int idx = candidates.indexOf(last);
Preconditions.checkArgument(idx != -1); Preconditions.checkArgument(idx != -1);
candidates = candidates.subList(idx + 1, candidates.size()); candidates.subList(0, idx + 1).clear();
} }
List<StoreFile> filesToCompact = compactSelection(candidates); List<StoreFile> filesToCompact = compactSelection(candidates);
@ -974,6 +974,11 @@ public class Store implements HeapSize {
int start = 0; int start = 0;
double r = this.compactRatio; double r = this.compactRatio;
// skip selection algorithm if we don't have enough files
if (filesToCompact.size() < this.minFilesToCompact) {
return Collections.emptyList();
}
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
// Sort files by size to correct when normal skew is altered by bulk load. // Sort files by size to correct when normal skew is altered by bulk load.
Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE); Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
@ -1390,7 +1395,7 @@ public class Store implements HeapSize {
* Determines if Store should be split * Determines if Store should be split
* @return byte[] if store should be split, null otherwise. * @return byte[] if store should be split, null otherwise.
*/ */
byte[] checkSplit() { public byte[] checkSplit() {
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
boolean force = this.region.shouldForceSplit(); boolean force = this.region.shouldForceSplit();

View File

@ -19,20 +19,33 @@
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
/** /**
* This class represents a compaction request and holds the region, priority, * This class holds all details necessary to run a compaction.
* and time submitted.
*/ */
public class CompactionRequest implements Comparable<CompactionRequest> { public class CompactionRequest implements Comparable<CompactionRequest>,
Runnable {
static final Log LOG = LogFactory.getLog(CompactionRequest.class); static final Log LOG = LogFactory.getLog(CompactionRequest.class);
private final HRegion r; private final HRegion r;
private final Store s; private final Store s;
@ -41,20 +54,12 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
private final boolean isMajor; private final boolean isMajor;
private int p; private int p;
private final Date date; private final Date date;
private HRegionServer server = null;
public CompactionRequest(HRegion r, Store s) {
this(r, s, null, false, s.getCompactPriority());
}
public CompactionRequest(HRegion r, Store s, int p) {
this(r, s, null, false, p);
}
public CompactionRequest(HRegion r, Store s, public CompactionRequest(HRegion r, Store s,
List<StoreFile> files, boolean isMajor, int p) { List<StoreFile> files, boolean isMajor, int p) {
if (r == null) { Preconditions.checkNotNull(r);
throw new NullPointerException("HRegion cannot be null"); Preconditions.checkNotNull(files);
}
this.r = r; this.r = r;
this.s = s; this.s = s;
@ -136,10 +141,77 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
this.p = p; this.p = p;
} }
public void setServer(HRegionServer hrs) {
this.server = hrs;
}
@Override
public String toString() { public String toString() {
String fsList = Joiner.on(", ").join(
Collections2.transform(Collections2.filter(files,
new Predicate<StoreFile>() {
public boolean apply(StoreFile sf) {
return sf.getReader() != null;
}
}), new Function<StoreFile, String>() {
public String apply(StoreFile sf) {
return StringUtils.humanReadableInt(sf.getReader().length());
}
}));
return "regionName=" + r.getRegionNameAsString() + return "regionName=" + r.getRegionNameAsString() +
", storeName=" + new String(s.getFamily().getName()) + ", storeName=" + new String(s.getFamily().getName()) +
", fileCount=" + files.size() + ", fileCount=" + files.size() +
", fileSize=" + StringUtils.humanReadableInt(totalSize) +
((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
", priority=" + p + ", date=" + date; ", priority=" + p + ", date=" + date;
} }
@Override
public void run() {
Preconditions.checkNotNull(server);
if (server.isStopped()) {
return;
}
try {
long start = EnvironmentEdgeManager.currentTimeMillis();
boolean completed = r.compact(this);
long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
this + "; duration=" + StringUtils.formatTimeDiff(now, start));
if (completed) {
server.getMetrics().addCompaction(now - start, this.totalSize);
// degenerate case: blocked regions require recursive enqueues
if (s.getCompactPriority() <= 0) {
server.compactSplitThread
.requestCompaction(r, s, "Recursive enqueue");
}
}
} catch (IOException ex) {
LOG.error("Compaction failed " + this, RemoteExceptionHandler
.checkIOException(ex));
server.checkFileSystem();
} catch (Exception ex) {
LOG.error("Compaction failed " + this, ex);
server.checkFileSystem();
} finally {
s.finishRequest(this);
LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
}
}
/**
* Cleanup class to use when rejecting a compaction request from the queue.
*/
public static class Rejection implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) {
if (request instanceof CompactionRequest) {
CompactionRequest cr = (CompactionRequest) request;
LOG.debug("Compaction Rejected: " + cr);
cr.getStore().finishRequest(cr);
}
}
}
} }