diff --git a/CHANGES.txt b/CHANGES.txt index 955d79f38b0..2a30030bf39 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -212,6 +212,8 @@ Release 0.91.0 - Unreleased HBASE-3721 Speedup LoadIncrementalHFiles (Ted Yu) HBASE-3855 Performance degradation of memstore because reseek is linear (dhruba borthakur) + HBASE-3797 StoreFile Level Compaction Locking + HBASE-1476 Multithreaded Compactions TASKS HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 0b3bec81eb3..8326c02cca8 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -19,34 +19,32 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Preconditions; /** * Compact region on request and then run split if appropriate */ -public class CompactSplitThread extends Thread implements CompactionRequestor { +public class CompactSplitThread implements CompactionRequestor { static final Log LOG = LogFactory.getLog(CompactSplitThread.class); - private final long frequency; - private final ReentrantLock lock = new ReentrantLock(); private final HRegionServer server; private final Configuration conf; - protected final BlockingQueue compactionQueue = - new PriorityBlockingQueue(); + private final ThreadPoolExecutor largeCompactions; + private final ThreadPoolExecutor smallCompactions; + private final ThreadPoolExecutor splits; + private final long throttleSize; /* The default priority for user-specified compaction requests. * The user gets top priority unless we have blocking compactions. (Pri <= 0) @@ -62,85 +60,71 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { private int regionSplitLimit; /** @param server */ - public CompactSplitThread(HRegionServer server) { + CompactSplitThread(HRegionServer server) { super(); this.server = server; this.conf = server.getConfiguration(); this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit", Integer.MAX_VALUE); - this.frequency = - conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", - 20 * 1000); + + int largeThreads = Math.max(1, conf.getInt( + "hbase.regionserver.thread.compaction.large", 1)); + int smallThreads = conf.getInt( + "hbase.regionserver.thread.compaction.small", 0); + throttleSize = conf.getLong( + "hbase.regionserver.thread.compaction.throttle", 0); + int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1); + + // if we have throttle threads, make sure the user also specified size + Preconditions.checkArgument(smallThreads == 0 || throttleSize > 0); + + this.largeCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, + 60, TimeUnit.SECONDS, new PriorityBlockingQueue()); + this.largeCompactions + .setRejectedExecutionHandler(new CompactionRequest.Rejection()); + if (smallThreads <= 0) { + this.smallCompactions = null; + } else { + this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, + 60, TimeUnit.SECONDS, new PriorityBlockingQueue()); + this.smallCompactions + .setRejectedExecutionHandler(new CompactionRequest.Rejection()); + } + this.splits = (ThreadPoolExecutor) Executors + .newFixedThreadPool(splitThreads); } @Override - public void run() { - while (!this.server.isStopped()) { - CompactionRequest compactionRequest = null; - HRegion r = null; - boolean completed = false; - try { - compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); - if (compactionRequest != null) { - r = compactionRequest.getHRegion(); - lock.lock(); - try { - // look for a split first - if(!this.server.isStopped()) { - // don't split regions that are blocking - if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) { - byte[] midkey = compactionRequest.getStore().checkSplit(); - if (midkey != null) { - split(r, midkey); - continue; - } - } - } + public String toString() { + return "compaction_queue=" + + (smallCompactions != null ? "(" + + largeCompactions.getQueue().size() + ":" + + smallCompactions.getQueue().size() + ")" + : largeCompactions.getQueue().size()) + + ", split_queue=" + splits.getQueue().size(); + } - // now test for compaction - if(!this.server.isStopped()) { - long startTime = EnvironmentEdgeManager.currentTimeMillis(); - completed = r.compact(compactionRequest); - long now = EnvironmentEdgeManager.currentTimeMillis(); - LOG.info(((completed) ? "completed" : "aborted") - + " compaction: " + compactionRequest + ", duration=" - + StringUtils.formatTimeDiff(now, startTime)); - if (completed) { // compaction aborted? - this.server.getMetrics(). - addCompaction(now - startTime, compactionRequest.getSize()); - } - } - } finally { - lock.unlock(); - } - } - } catch (InterruptedException ex) { - continue; - } catch (IOException ex) { - LOG.error("Compaction/Split failed " + compactionRequest, - RemoteExceptionHandler.checkIOException(ex)); - if (!server.checkFileSystem()) { - break; - } - } catch (Exception ex) { - LOG.error("Compaction failed " + compactionRequest, ex); - if (!server.checkFileSystem()) { - break; - } - } finally { - if (compactionRequest != null) { - Store s = compactionRequest.getStore(); - s.finishRequest(compactionRequest); - // degenerate case: blocked regions require recursive enqueues - if (s.getCompactPriority() < PRIORITY_USER && completed) { - requestCompaction(r, s, "Recursive enqueue"); - } - } - compactionRequest = null; + public synchronized boolean requestSplit(final HRegion r) { + // don't split regions that are blocking + if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) { + byte[] midKey = r.checkSplit(); + if (midKey != null) { + requestSplit(r, midKey); + return true; } } - compactionQueue.clear(); - LOG.info(getName() + " exiting"); + return false; + } + + public synchronized void requestSplit(final HRegion r, byte[] midKey) { + try { + this.splits.execute(new SplitRequest(r, midKey, this.server)); + if (LOG.isDebugEnabled()) { + LOG.debug("Split requested for " + r + ". " + this); + } + } catch (RejectedExecutionException ree) { + LOG.info("Could not execute split for " + r, ree); + } } public synchronized void requestCompaction(final HRegion r, @@ -164,7 +148,7 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { /** * @param r HRegion store belongs to - * @param force Whether next compaction should be major + * @param s Store to request compaction on * @param why Why compaction requested -- used in debug messages * @param priority override the default priority (NO_PRIORITY == decide) */ @@ -175,67 +159,58 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { } CompactionRequest cr = s.requestCompaction(); if (cr != null) { + cr.setServer(server); if (priority != NO_PRIORITY) { cr.setPriority(priority); } - boolean addedToQueue = compactionQueue.add(cr); - if (!addedToQueue) { - LOG.error("Could not add request to compaction queue: " + cr); - s.finishRequest(cr); - } else if (LOG.isDebugEnabled()) { - LOG.debug("Compaction requested: " + cr + ThreadPoolExecutor pool = largeCompactions; + if (smallCompactions != null && throttleSize > cr.getSize()) { + // smallCompactions is like the 10 items or less line at Walmart + pool = smallCompactions; + } + pool.execute(cr); + if (LOG.isDebugEnabled()) { + String type = ""; + if (smallCompactions != null) { + type = (pool == smallCompactions) ? "Small " : "Large "; + } + LOG.debug(type + "Compaction requested: " + cr + (why != null && !why.isEmpty() ? "; Because: " + why : "") - + "; Priority: " + priority + "; Compaction queue size: " - + compactionQueue.size()); + + "; " + this); } } } - private void split(final HRegion parent, final byte [] midKey) - throws IOException { - final long startTime = System.currentTimeMillis(); - SplitTransaction st = new SplitTransaction(parent, midKey); - // If prepare does not return true, for some reason -- logged inside in - // the prepare call -- we are not ready to split just now. Just return. - if (!st.prepare()) return; - try { - st.execute(this.server, this.server); - } catch (Exception e) { - try { - LOG.info("Running rollback of failed split of " + - parent.getRegionNameAsString() + "; " + e.getMessage()); - st.rollback(this.server, this.server); - LOG.info("Successful rollback of failed split of " + - parent.getRegionNameAsString()); - } catch (Exception ee) { - // If failed rollback, kill this server to avoid having a hole in table. - LOG.info("Failed rollback of failed split of " + - parent.getRegionNameAsString() + " -- aborting server", ee); - this.server.abort("Failed split"); - } - return; - } - - LOG.info("Region split, META updated, and report to master. Parent=" + - parent.getRegionInfo().getRegionNameAsString() + ", new regions: " + - st.getFirstDaughter().getRegionNameAsString() + ", " + - st.getSecondDaughter().getRegionNameAsString() + ". Split took " + - StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); - } - /** * Only interrupt once it's done with a run through the work loop. */ void interruptIfNecessary() { - if (lock.tryLock()) { + splits.shutdown(); + largeCompactions.shutdown(); + if (smallCompactions != null) + smallCompactions.shutdown(); + } + + private void waitFor(ThreadPoolExecutor t, String name) { + boolean done = false; + while (!done) { try { - this.interrupt(); - } finally { - lock.unlock(); + done = t.awaitTermination(60, TimeUnit.SECONDS); + LOG.debug("Waiting for " + name + " to finish..."); + } catch (InterruptedException ie) { + LOG.debug("Interrupted waiting for " + name + " to finish..."); } } } + void join() { + waitFor(splits, "Split Thread"); + waitFor(largeCompactions, "Large Compaction Thread"); + if (smallCompactions != null) { + waitFor(smallCompactions, "Small Compaction Thread"); + } + } + /** * Returns the current size of the queue containing regions that are * processed. @@ -243,7 +218,10 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { * @return The current size of the regions queue. */ public int getCompactionQueueSize() { - return compactionQueue.size(); + int size = largeCompactions.getQueue().size(); + if (smallCompactions != null) + size += smallCompactions.getQueue().size(); + return size; } private boolean shouldSplitRegion() { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2a7493b6de7..c2e51b89977 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -890,7 +890,8 @@ public class HRegion implements HeapSize { // , Writable{ return false; } } - LOG.info("Starting compaction on region " + this); + LOG.info("Starting compaction on " + cr.getStore() + " in region " + + this); doRegionCompactionPrep(); try { status.setStatus("Compacting store " + cr.getStore()); @@ -3707,6 +3708,20 @@ public class HRegion implements HeapSize { // , Writable{ // nothing } + byte[] checkSplit() { + if (this.splitPoint != null) { + return this.splitPoint; + } + byte[] splitPoint = null; + for (Store s : stores.values()) { + splitPoint = s.checkSplit(); + if (splitPoint != null) { + return splitPoint; + } + } + return null; + } + /** * @return The priority that this region should have in the compaction queue */ diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 98452f9f61f..9dbf7e16919 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -226,7 +226,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, private RegionServerMetrics metrics; // Compactions - CompactSplitThread compactSplitThread; + public CompactSplitThread compactSplitThread; // Cache flushing MemStoreFlusher cacheFlusher; @@ -1017,7 +1017,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, * * @return false if file system is not available */ - protected boolean checkFileSystem() { + public boolean checkFileSystem() { if (this.fsOk && this.fs != null) { try { FSUtils.checkFileSystemAvailable(this.fs); @@ -1247,8 +1247,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler); Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", handler); - Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", - handler); Threads.setDaemonThreadRunning(this.majorCompactionChecker, n + ".majorCompactionChecker", handler); @@ -1316,7 +1314,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return false; } // Verify that all threads are alive - if (!(leases.isAlive() && compactSplitThread.isAlive() + if (!(leases.isAlive() && cacheFlusher.isAlive() && hlogRoller.isAlive() && this.majorCompactionChecker.isAlive())) { stop("One or more threads are no longer alive -- stop"); @@ -1434,8 +1432,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, protected void join() { Threads.shutdown(this.majorCompactionChecker); Threads.shutdown(this.cacheFlusher); - Threads.shutdown(this.compactSplitThread); Threads.shutdown(this.hlogRoller); + if (this.compactSplitThread != null) { + this.compactSplitThread.join(); + } if (this.service != null) this.service.shutdown(); if (this.replicationHandler != null) { this.replicationHandler.join(); @@ -2338,11 +2338,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, HRegion region = getRegion(regionInfo.getRegionName()); region.flushcache(); region.forceSplit(splitPoint); - // force a compaction, split will be side-effect - // TODO: flush/compact/split refactor will make it trivial to do this - // sync/async (and won't require us to do a compaction to split!) - compactSplitThread.requestCompaction(region, "User-triggered split", - CompactSplitThread.PRIORITY_USER); + compactSplitThread.requestSplit(region, region.checkSplit()); } @Override diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index f66a7cdf008..213b2ab886b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -354,7 +354,9 @@ class MemStoreFlusher extends Thread implements FlushRequester { LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); } - this.server.compactSplitThread.requestCompaction(region, getName()); + if (!this.server.compactSplitThread.requestSplit(region)) { + this.server.compactSplitThread.requestCompaction(region, getName()); + } // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java new file mode 100644 index 00000000000..dda226310c4 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -0,0 +1,76 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Preconditions; + +/** + * Handles processing region splits. Put in a queue, owned by HRegionServer. + */ +class SplitRequest implements Runnable { + static final Log LOG = LogFactory.getLog(SplitRequest.class); + private final HRegion parent; + private final byte[] midKey; + private final HRegionServer server; + + SplitRequest(HRegion region, byte[] midKey, HRegionServer hrs) { + Preconditions.checkNotNull(hrs); + this.parent = region; + this.midKey = midKey; + this.server = hrs; + } + + @Override + public String toString() { + return "regionName=" + parent + ", midKey=" + Bytes.toStringBinary(midKey); + } + + @Override + public void run() { + try { + final long startTime = System.currentTimeMillis(); + SplitTransaction st = new SplitTransaction(parent, midKey); + // If prepare does not return true, for some reason -- logged inside in + // the prepare call -- we are not ready to split just now. Just return. + if (!st.prepare()) return; + try { + st.execute(this.server, this.server); + } catch (Exception e) { + try { + LOG.info("Running rollback of failed split of " + parent + "; " + + e.getMessage()); + st.rollback(this.server, this.server); + LOG.info("Successful rollback of failed split of " + parent); + } catch (RuntimeException ee) { + // If failed rollback, kill server to avoid having a hole in table. + LOG.info("Failed rollback of failed split of " + + parent.getRegionNameAsString() + " -- aborting server", ee); + this.server.abort("Failed split"); + } + return; + } + LOG.info("Region split, META updated, and report to master. Parent=" + + parent.getRegionInfo().getRegionNameAsString() + ", new regions: " + + st.getFirstDaughter().getRegionNameAsString() + ", " + + st.getSecondDaughter().getRegionNameAsString() + ". Split took " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); + } catch (IOException ex) { + LOG.error("Split failed " + this, RemoteExceptionHandler + .checkIOException(ex)); + server.checkFileSystem(); + } + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 445bd0b74ad..21468ad0a33 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -695,7 +695,7 @@ public class Store implements HeapSize { StoreFile last = filesCompacting.get(filesCompacting.size() - 1); int idx = filesToCompact.indexOf(last); Preconditions.checkArgument(idx != -1); - filesToCompact = filesToCompact.subList(idx+1, filesToCompact.size()); + filesToCompact.subList(0, idx + 1).clear(); } int count = filesToCompact.size(); if (N > count) { @@ -868,7 +868,7 @@ public class Store implements HeapSize { StoreFile last = filesCompacting.get(filesCompacting.size() - 1); int idx = candidates.indexOf(last); Preconditions.checkArgument(idx != -1); - candidates = candidates.subList(idx + 1, candidates.size()); + candidates.subList(0, idx + 1).clear(); } List filesToCompact = compactSelection(candidates); @@ -974,6 +974,11 @@ public class Store implements HeapSize { int start = 0; double r = this.compactRatio; + // skip selection algorithm if we don't have enough files + if (filesToCompact.size() < this.minFilesToCompact) { + return Collections.emptyList(); + } + /* TODO: add sorting + unit test back in when HBASE-2856 is fixed // Sort files by size to correct when normal skew is altered by bulk load. Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE); @@ -1390,7 +1395,7 @@ public class Store implements HeapSize { * Determines if Store should be split * @return byte[] if store should be split, null otherwise. */ - byte[] checkSplit() { + public byte[] checkSplit() { this.lock.readLock().lock(); try { boolean force = this.region.shouldForceSplit(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index 28114ef585c..a507f12dd98 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -19,20 +19,33 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import java.io.IOException; import java.util.Date; import java.util.List; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.StringUtils; - /** - * This class represents a compaction request and holds the region, priority, - * and time submitted. - */ - public class CompactionRequest implements Comparable { +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; + +/** + * This class holds all details necessary to run a compaction. + */ +public class CompactionRequest implements Comparable, + Runnable { static final Log LOG = LogFactory.getLog(CompactionRequest.class); private final HRegion r; private final Store s; @@ -41,20 +54,12 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; private final boolean isMajor; private int p; private final Date date; - - public CompactionRequest(HRegion r, Store s) { - this(r, s, null, false, s.getCompactPriority()); - } - - public CompactionRequest(HRegion r, Store s, int p) { - this(r, s, null, false, p); - } + private HRegionServer server = null; public CompactionRequest(HRegion r, Store s, List files, boolean isMajor, int p) { - if (r == null) { - throw new NullPointerException("HRegion cannot be null"); - } + Preconditions.checkNotNull(r); + Preconditions.checkNotNull(files); this.r = r; this.s = s; @@ -136,10 +141,77 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; this.p = p; } + public void setServer(HRegionServer hrs) { + this.server = hrs; + } + + @Override public String toString() { + String fsList = Joiner.on(", ").join( + Collections2.transform(Collections2.filter(files, + new Predicate() { + public boolean apply(StoreFile sf) { + return sf.getReader() != null; + } + }), new Function() { + public String apply(StoreFile sf) { + return StringUtils.humanReadableInt(sf.getReader().length()); + } + })); + return "regionName=" + r.getRegionNameAsString() + ", storeName=" + new String(s.getFamily().getName()) + ", fileCount=" + files.size() + + ", fileSize=" + StringUtils.humanReadableInt(totalSize) + + ((fsList.isEmpty()) ? "" : " (" + fsList + ")") + ", priority=" + p + ", date=" + date; } + + @Override + public void run() { + Preconditions.checkNotNull(server); + if (server.isStopped()) { + return; + } + try { + long start = EnvironmentEdgeManager.currentTimeMillis(); + boolean completed = r.compact(this); + long now = EnvironmentEdgeManager.currentTimeMillis(); + LOG.info(((completed) ? "completed" : "aborted") + " compaction: " + + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); + if (completed) { + server.getMetrics().addCompaction(now - start, this.totalSize); + // degenerate case: blocked regions require recursive enqueues + if (s.getCompactPriority() <= 0) { + server.compactSplitThread + .requestCompaction(r, s, "Recursive enqueue"); + } + } + } catch (IOException ex) { + LOG.error("Compaction failed " + this, RemoteExceptionHandler + .checkIOException(ex)); + server.checkFileSystem(); + } catch (Exception ex) { + LOG.error("Compaction failed " + this, ex); + server.checkFileSystem(); + } finally { + s.finishRequest(this); + LOG.debug("CompactSplitThread Status: " + server.compactSplitThread); + } + } + + /** + * Cleanup class to use when rejecting a compaction request from the queue. + */ + public static class Rejection implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) { + if (request instanceof CompactionRequest) { + CompactionRequest cr = (CompactionRequest) request; + LOG.debug("Compaction Rejected: " + cr); + cr.getStore().finishRequest(cr); + } + } + } }