From 7b4eb5b5054a4b74d7f5dd9b668d40f2b09efeeb Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 29 Oct 2010 06:08:44 +0000 Subject: [PATCH] HBASE-3160 Use more intelligent priorities for PriorityCompactionQueue git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1028622 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../regionserver/CompactSplitThread.java | 47 ++----- .../regionserver/CompactionRequestor.java | 7 ++ .../hadoop/hbase/regionserver/HRegion.java | 11 ++ .../hbase/regionserver/HRegionServer.java | 6 +- .../hbase/regionserver/MemStoreFlusher.java | 3 +- .../regionserver/PriorityCompactionQueue.java | 35 +++--- .../hadoop/hbase/regionserver/Store.java | 10 ++ .../TestPriorityCompactionQueue.java | 117 +++++++++--------- 9 files changed, 119 insertions(+), 119 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5ff9f7760b7..b3896bd3ec6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -622,6 +622,8 @@ Release 0.21.0 - Unreleased HBASE-3012 TOF doesn't take zk client port for remote clusters HBASE-3159 Double play of OpenedRegionHandler for a single region and assorted fixes around this + TestRollingRestart added + HBASE-3160 Use more intelligent priorities for PriorityCompactionQueue + (Nicolas Spiegelberg via Stack) IMPROVEMENTS diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 493ad9f70ba..a864114701c 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -43,33 +43,10 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { private final PriorityCompactionQueue compactionQueue = new PriorityCompactionQueue(); - /** The priorities for a compaction request. */ - public enum Priority implements Comparable { - //NOTE: All priorities should be numbered consecutively starting with 1. - //The highest priority should be 1 followed by all lower priorities. - //Priorities can be changed at anytime without requiring any changes to the - //queue. - - /** HIGH_BLOCKING should only be used when an operation is blocked until a - * compact / split is done (e.g. a MemStore can't flush because it has - * "too many store files" and is blocking until a compact / split is done) - */ - HIGH_BLOCKING(1), - /** A normal compaction / split request */ - NORMAL(2), - /** A low compaction / split request -- not currently used */ - LOW(3); - - int value; - - Priority(int value) { - this.value = value; - } - - int getInt() { - return value; - } - } + /* The default priority for user-specified compaction requests. + * The user gets top priority unless we have blocking compactions. (Pri <= 0) + */ + public static final int PRIORITY_USER = 1; /** * Splitting should not take place if the total number of regions exceed this. @@ -138,30 +115,28 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { public synchronized void requestCompaction(final HRegion r, final String why) { - requestCompaction(r, false, why, Priority.NORMAL); + requestCompaction(r, false, why, r.getCompactPriority()); } public synchronized void requestCompaction(final HRegion r, - final String why, Priority p) { + final String why, int p) { requestCompaction(r, false, why, p); } - public synchronized void requestCompaction(final HRegion r, - final boolean force, final String why) { - requestCompaction(r, force, why, Priority.NORMAL); - } - /** * @param r HRegion store belongs to * @param force Whether next compaction should be major * @param why Why compaction requested -- used in debug messages */ public synchronized void requestCompaction(final HRegion r, - final boolean force, final String why, Priority priority) { + final boolean force, final String why, int priority) { if (this.server.isStopped()) { return; } - r.setForceMajorCompaction(force); + // tell the region to major-compact (and don't downgrade it) + if (force) { + r.setForceMajorCompaction(force); + } if (compactionQueue.add(r, priority) && LOG.isDebugEnabled()) { LOG.debug("Compaction " + (force? "(major) ": "") + "requested for region " + r.getRegionNameAsString() + diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java index b7e868da76c..f3be5e4d86a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java @@ -25,4 +25,11 @@ public interface CompactionRequestor { * @param why Why compaction was requested -- used in debug messages */ public void requestCompaction(final HRegion r, final String why); + + /** + * @param r Region to compact + * @param why Why compaction was requested -- used in debug messages + * @param pri Priority of this compaction. minHeap. <=0 is critical + */ + public void requestCompaction(final HRegion r, final String why, int pri); } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ec673561889..ccaa2fc4f99 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3279,6 +3279,17 @@ public class HRegion implements HeapSize { // , Writable{ // nothing } + /** + * @return The priority that this region should have in the compaction queue + */ + public int getCompactPriority() { + int count = Integer.MAX_VALUE; + for(Store store : stores.values()) { + count = Math.min(count, store.getCompactPriority()); + } + return count; + } + /** * Checks every store to see if one has too many * store files diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8c4d67fc9c4..941bcf4c67a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2023,7 +2023,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // force a compaction, split will be side-effect // TODO: flush/compact/split refactor will make it trivial to do this // sync/async (and won't require us to do a compaction to split!) - compactSplitThread.requestCompaction(region, "User-triggered split"); + compactSplitThread.requestCompaction(region, "User-triggered split", + CompactSplitThread.PRIORITY_USER); } @Override @@ -2033,7 +2034,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, region.flushcache(); region.shouldSplit(true); compactSplitThread.requestCompaction(region, major, "User-triggered " - + (major ? "major " : "") + "compaction"); + + (major ? "major " : "") + "compaction", + CompactSplitThread.PRIORITY_USER); } /** @return the info server */ diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 3eb8108a33b..a0e74a00aa7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -212,8 +212,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); } - this.server.compactSplitThread.requestCompaction(region, getName(), - CompactSplitThread.Priority.HIGH_BLOCKING); + this.server.compactSplitThread.requestCompaction(region, getName()); // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java b/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java index 5e999fb4797..0bce554a177 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java @@ -28,7 +28,6 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority; /** * This class delegates to the BlockingQueue but wraps all HRegions in @@ -47,22 +46,18 @@ public class PriorityCompactionQueue implements BlockingQueue { */ private class CompactionRequest implements Comparable { private final HRegion r; - private final Priority p; + private final int p; private final Date date; - public CompactionRequest(HRegion r, Priority p) { + public CompactionRequest(HRegion r, int p) { this(r, p, null); } - public CompactionRequest(HRegion r, Priority p, Date d) { + public CompactionRequest(HRegion r, int p, Date d) { if (r == null) { throw new NullPointerException("HRegion cannot be null"); } - if (p == null) { - p = Priority.NORMAL; //the default priority - } - if (d == null) { d = new Date(); } @@ -91,7 +86,7 @@ public class PriorityCompactionQueue implements BlockingQueue { } int compareVal; - compareVal = p.compareTo(request.p); //compare priority + compareVal = p - request.p; //compare priority if (compareVal != 0) { return compareVal; } @@ -111,7 +106,7 @@ public class PriorityCompactionQueue implements BlockingQueue { } /** Gets the priority for the request */ - Priority getPriority() { + int getPriority() { return p; } @@ -140,13 +135,13 @@ public class PriorityCompactionQueue implements BlockingQueue { * @param p If null it will use the default priority * @return returns a compaction request if it isn't already in the queue */ - protected CompactionRequest addToRegionsInQueue(HRegion r, Priority p) { + protected CompactionRequest addToRegionsInQueue(HRegion r, int p) { CompactionRequest queuedRequest = null; CompactionRequest newRequest = new CompactionRequest(r, p); synchronized (regionsInQueue) { queuedRequest = regionsInQueue.get(r); if (queuedRequest == null || - newRequest.getPriority().compareTo(queuedRequest.getPriority()) < 0) { + newRequest.getPriority() < queuedRequest.getPriority()) { LOG.trace("Inserting region in queue. " + newRequest); regionsInQueue.put(r, newRequest); } else { @@ -189,7 +184,7 @@ public class PriorityCompactionQueue implements BlockingQueue { } } - public boolean add(HRegion e, Priority p) { + public boolean add(HRegion e, int p) { CompactionRequest request = this.addToRegionsInQueue(e, p); if (request != null) { boolean result = queue.add(request); @@ -201,20 +196,20 @@ public class PriorityCompactionQueue implements BlockingQueue { @Override public boolean add(HRegion e) { - return add(e, null); + return add(e, e.getCompactPriority()); } - public boolean offer(HRegion e, Priority p) { + public boolean offer(HRegion e, int p) { CompactionRequest request = this.addToRegionsInQueue(e, p); return (request != null)? queue.offer(request): false; } @Override public boolean offer(HRegion e) { - return offer(e, null); + return offer(e, e.getCompactPriority()); } - public void put(HRegion e, Priority p) throws InterruptedException { + public void put(HRegion e, int p) throws InterruptedException { CompactionRequest request = this.addToRegionsInQueue(e, p); if (request != null) { queue.put(request); @@ -223,10 +218,10 @@ public class PriorityCompactionQueue implements BlockingQueue { @Override public void put(HRegion e) throws InterruptedException { - put(e, null); + put(e, e.getCompactPriority()); } - public boolean offer(HRegion e, Priority p, long timeout, TimeUnit unit) + public boolean offer(HRegion e, int p, long timeout, TimeUnit unit) throws InterruptedException { CompactionRequest request = this.addToRegionsInQueue(e, p); return (request != null)? queue.offer(request, timeout, unit): false; @@ -235,7 +230,7 @@ public class PriorityCompactionQueue implements BlockingQueue { @Override public boolean offer(HRegion e, long timeout, TimeUnit unit) throws InterruptedException { - return offer(e, null, timeout, unit); + return offer(e, e.getCompactPriority(), timeout, unit); } @Override diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 981dfaf0b0e..66e8a074bff 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -95,6 +95,7 @@ public class Store implements HeapSize { /* how many bytes to write between status checks */ static int closeCheckInterval = 0; private final long desiredMaxFileSize; + private final int blockingStoreFileCount; private volatile long storeSize = 0L; private final Object flushLock = new Object(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -186,6 +187,8 @@ public class Store implements HeapSize { HConstants.DEFAULT_MAX_FILE_SIZE); } this.desiredMaxFileSize = maxFileSize; + this.blockingStoreFileCount = + conf.getInt("hbase.hstore.blockingStoreFiles", -1); this.majorCompactionTime = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 86400000); @@ -1331,6 +1334,13 @@ public class Store implements HeapSize { } return size; } + + /** + * @return The priority that this store should have in the compaction queue + */ + int getCompactPriority() { + return this.blockingStoreFileCount - this.storefiles.size(); + } /** * Datastructure that holds size and row to split a file around. diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java index 2607acb84be..4d52c327e8a 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -76,7 +75,7 @@ public class TestPriorityCompactionQueue { } } - protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) { + protected void addRegion(PriorityCompactionQueue pq, HRegion r, int p) { pq.add(r, p); try { // Sleep 1 millisecond so 2 things are not put in the queue within the @@ -105,11 +104,11 @@ public class TestPriorityCompactionQueue { // test 1 // check fifo w/priority - addRegion(pq, r1, Priority.HIGH_BLOCKING); - addRegion(pq, r2, Priority.HIGH_BLOCKING); - addRegion(pq, r3, Priority.HIGH_BLOCKING); - addRegion(pq, r4, Priority.HIGH_BLOCKING); - addRegion(pq, r5, Priority.HIGH_BLOCKING); + addRegion(pq, r1, CompactSplitThread.PRIORITY_BLOCKING); + addRegion(pq, r2, CompactSplitThread.PRIORITY_BLOCKING); + addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING); + addRegion(pq, r4, CompactSplitThread.PRIORITY_BLOCKING); + addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING); getAndCheckRegion(pq, r1); getAndCheckRegion(pq, r2); @@ -118,55 +117,41 @@ public class TestPriorityCompactionQueue { getAndCheckRegion(pq, r5); // test 2 - // check fifo - addRegion(pq, r1, null); - addRegion(pq, r2, null); - addRegion(pq, r3, null); - addRegion(pq, r4, null); - addRegion(pq, r5, null); + // check fifo w/mixed priority + addRegion(pq, r1, CompactSplitThread.PRIORITY_BLOCKING); + addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING); getAndCheckRegion(pq, r1); - getAndCheckRegion(pq, r2); getAndCheckRegion(pq, r3); - getAndCheckRegion(pq, r4); getAndCheckRegion(pq, r5); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r4); // test 3 // check fifo w/mixed priority - addRegion(pq, r1, Priority.HIGH_BLOCKING); - addRegion(pq, r2, Priority.NORMAL); - addRegion(pq, r3, Priority.HIGH_BLOCKING); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.HIGH_BLOCKING); + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING); - getAndCheckRegion(pq, r1); - getAndCheckRegion(pq, r3); getAndCheckRegion(pq, r5); + getAndCheckRegion(pq, r1); getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); getAndCheckRegion(pq, r4); // test 4 - // check fifo w/mixed priority - addRegion(pq, r1, Priority.NORMAL); - addRegion(pq, r2, Priority.NORMAL); - addRegion(pq, r3, Priority.NORMAL); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.HIGH_BLOCKING); - - getAndCheckRegion(pq, r5); - getAndCheckRegion(pq, r1); - getAndCheckRegion(pq, r2); - getAndCheckRegion(pq, r3); - getAndCheckRegion(pq, r4); - - // test 5 // check fifo w/mixed priority elevation time - addRegion(pq, r1, Priority.NORMAL); - addRegion(pq, r2, Priority.HIGH_BLOCKING); - addRegion(pq, r3, Priority.NORMAL); + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, CompactSplitThread.PRIORITY_BLOCKING); + addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); Thread.sleep(1000); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.HIGH_BLOCKING); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING); getAndCheckRegion(pq, r2); getAndCheckRegion(pq, r5); @@ -177,15 +162,15 @@ public class TestPriorityCompactionQueue { // reset the priority compaction queue back to a normal queue pq = new PriorityCompactionQueue(); - // test 7 + // test 5 // test that lower priority are removed from the queue when a high priority // is added - addRegion(pq, r1, Priority.NORMAL); - addRegion(pq, r2, Priority.NORMAL); - addRegion(pq, r3, Priority.NORMAL); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.NORMAL); - addRegion(pq, r3, Priority.HIGH_BLOCKING); + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING); getAndCheckRegion(pq, r3); getAndCheckRegion(pq, r1); @@ -195,18 +180,18 @@ public class TestPriorityCompactionQueue { Assert.assertTrue("Queue should be empty.", pq.size() == 0); - // test 8 + // test 6 // don't add the same region more than once - addRegion(pq, r1, Priority.NORMAL); - addRegion(pq, r2, Priority.NORMAL); - addRegion(pq, r3, Priority.NORMAL); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.NORMAL); - addRegion(pq, r1, Priority.NORMAL); - addRegion(pq, r2, Priority.NORMAL); - addRegion(pq, r3, Priority.NORMAL); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.NORMAL); + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, CompactSplitThread.PRIORITY_USER); getAndCheckRegion(pq, r1); getAndCheckRegion(pq, r2); @@ -215,5 +200,19 @@ public class TestPriorityCompactionQueue { getAndCheckRegion(pq, r5); Assert.assertTrue("Queue should be empty.", pq.size() == 0); + + // test 7 + // we can handle negative priorities + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, -1); + addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING); + addRegion(pq, r4, -2); + + getAndCheckRegion(pq, r4); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r1); + + Assert.assertTrue("Queue should be empty.", pq.size() == 0); } } \ No newline at end of file