HBASE-3160 Use more intelligent priorities for PriorityCompactionQueue

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1028622 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-10-29 06:08:44 +00:00
parent 0944bd9fb0
commit 7b4eb5b505
9 changed files with 119 additions and 119 deletions

View File

@ -622,6 +622,8 @@ Release 0.21.0 - Unreleased
HBASE-3012 TOF doesn't take zk client port for remote clusters
HBASE-3159 Double play of OpenedRegionHandler for a single region
and assorted fixes around this + TestRollingRestart added
HBASE-3160 Use more intelligent priorities for PriorityCompactionQueue
(Nicolas Spiegelberg via Stack)
IMPROVEMENTS

View File

@ -43,33 +43,10 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
private final PriorityCompactionQueue compactionQueue =
new PriorityCompactionQueue();
/** The priorities for a compaction request. */
public enum Priority implements Comparable<Priority> {
//NOTE: All priorities should be numbered consecutively starting with 1.
//The highest priority should be 1 followed by all lower priorities.
//Priorities can be changed at anytime without requiring any changes to the
//queue.
/** HIGH_BLOCKING should only be used when an operation is blocked until a
* compact / split is done (e.g. a MemStore can't flush because it has
* "too many store files" and is blocking until a compact / split is done)
*/
HIGH_BLOCKING(1),
/** A normal compaction / split request */
NORMAL(2),
/** A low compaction / split request -- not currently used */
LOW(3);
int value;
Priority(int value) {
this.value = value;
}
int getInt() {
return value;
}
}
/* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
*/
public static final int PRIORITY_USER = 1;
/**
* Splitting should not take place if the total number of regions exceed this.
@ -138,30 +115,28 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
public synchronized void requestCompaction(final HRegion r,
final String why) {
requestCompaction(r, false, why, Priority.NORMAL);
requestCompaction(r, false, why, r.getCompactPriority());
}
public synchronized void requestCompaction(final HRegion r,
final String why, Priority p) {
final String why, int p) {
requestCompaction(r, false, why, p);
}
public synchronized void requestCompaction(final HRegion r,
final boolean force, final String why) {
requestCompaction(r, force, why, Priority.NORMAL);
}
/**
* @param r HRegion store belongs to
* @param force Whether next compaction should be major
* @param why Why compaction requested -- used in debug messages
*/
public synchronized void requestCompaction(final HRegion r,
final boolean force, final String why, Priority priority) {
final boolean force, final String why, int priority) {
if (this.server.isStopped()) {
return;
}
r.setForceMajorCompaction(force);
// tell the region to major-compact (and don't downgrade it)
if (force) {
r.setForceMajorCompaction(force);
}
if (compactionQueue.add(r, priority) && LOG.isDebugEnabled()) {
LOG.debug("Compaction " + (force? "(major) ": "") +
"requested for region " + r.getRegionNameAsString() +

View File

@ -25,4 +25,11 @@ public interface CompactionRequestor {
* @param why Why compaction was requested -- used in debug messages
*/
public void requestCompaction(final HRegion r, final String why);
/**
* @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. <=0 is critical
*/
public void requestCompaction(final HRegion r, final String why, int pri);
}

View File

@ -3279,6 +3279,17 @@ public class HRegion implements HeapSize { // , Writable{
// nothing
}
/**
* @return The priority that this region should have in the compaction queue
*/
public int getCompactPriority() {
int count = Integer.MAX_VALUE;
for(Store store : stores.values()) {
count = Math.min(count, store.getCompactPriority());
}
return count;
}
/**
* Checks every store to see if one has too many
* store files

View File

@ -2023,7 +2023,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// force a compaction, split will be side-effect
// TODO: flush/compact/split refactor will make it trivial to do this
// sync/async (and won't require us to do a compaction to split!)
compactSplitThread.requestCompaction(region, "User-triggered split");
compactSplitThread.requestCompaction(region, "User-triggered split",
CompactSplitThread.PRIORITY_USER);
}
@Override
@ -2033,7 +2034,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
region.flushcache();
region.shouldSplit(true);
compactSplitThread.requestCompaction(region, major, "User-triggered "
+ (major ? "major " : "") + "compaction");
+ (major ? "major " : "") + "compaction",
CompactSplitThread.PRIORITY_USER);
}
/** @return the info server */

View File

@ -212,8 +212,7 @@ class MemStoreFlusher extends Thread implements FlushRequester {
LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
}
this.server.compactSplitThread.requestCompaction(region, getName(),
CompactSplitThread.Priority.HIGH_BLOCKING);
this.server.compactSplitThread.requestCompaction(region, getName());
// Put back on the queue. Have it come back out of the queue
// after a delay of this.blockingWaitTime / 100 ms.
this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));

View File

@ -28,7 +28,6 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority;
/**
* This class delegates to the BlockingQueue but wraps all HRegions in
@ -47,22 +46,18 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
*/
private class CompactionRequest implements Comparable<CompactionRequest> {
private final HRegion r;
private final Priority p;
private final int p;
private final Date date;
public CompactionRequest(HRegion r, Priority p) {
public CompactionRequest(HRegion r, int p) {
this(r, p, null);
}
public CompactionRequest(HRegion r, Priority p, Date d) {
public CompactionRequest(HRegion r, int p, Date d) {
if (r == null) {
throw new NullPointerException("HRegion cannot be null");
}
if (p == null) {
p = Priority.NORMAL; //the default priority
}
if (d == null) {
d = new Date();
}
@ -91,7 +86,7 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
}
int compareVal;
compareVal = p.compareTo(request.p); //compare priority
compareVal = p - request.p; //compare priority
if (compareVal != 0) {
return compareVal;
}
@ -111,7 +106,7 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
}
/** Gets the priority for the request */
Priority getPriority() {
int getPriority() {
return p;
}
@ -140,13 +135,13 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
* @param p If null it will use the default priority
* @return returns a compaction request if it isn't already in the queue
*/
protected CompactionRequest addToRegionsInQueue(HRegion r, Priority p) {
protected CompactionRequest addToRegionsInQueue(HRegion r, int p) {
CompactionRequest queuedRequest = null;
CompactionRequest newRequest = new CompactionRequest(r, p);
synchronized (regionsInQueue) {
queuedRequest = regionsInQueue.get(r);
if (queuedRequest == null ||
newRequest.getPriority().compareTo(queuedRequest.getPriority()) < 0) {
newRequest.getPriority() < queuedRequest.getPriority()) {
LOG.trace("Inserting region in queue. " + newRequest);
regionsInQueue.put(r, newRequest);
} else {
@ -189,7 +184,7 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
}
}
public boolean add(HRegion e, Priority p) {
public boolean add(HRegion e, int p) {
CompactionRequest request = this.addToRegionsInQueue(e, p);
if (request != null) {
boolean result = queue.add(request);
@ -201,20 +196,20 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
@Override
public boolean add(HRegion e) {
return add(e, null);
return add(e, e.getCompactPriority());
}
public boolean offer(HRegion e, Priority p) {
public boolean offer(HRegion e, int p) {
CompactionRequest request = this.addToRegionsInQueue(e, p);
return (request != null)? queue.offer(request): false;
}
@Override
public boolean offer(HRegion e) {
return offer(e, null);
return offer(e, e.getCompactPriority());
}
public void put(HRegion e, Priority p) throws InterruptedException {
public void put(HRegion e, int p) throws InterruptedException {
CompactionRequest request = this.addToRegionsInQueue(e, p);
if (request != null) {
queue.put(request);
@ -223,10 +218,10 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
@Override
public void put(HRegion e) throws InterruptedException {
put(e, null);
put(e, e.getCompactPriority());
}
public boolean offer(HRegion e, Priority p, long timeout, TimeUnit unit)
public boolean offer(HRegion e, int p, long timeout, TimeUnit unit)
throws InterruptedException {
CompactionRequest request = this.addToRegionsInQueue(e, p);
return (request != null)? queue.offer(request, timeout, unit): false;
@ -235,7 +230,7 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
@Override
public boolean offer(HRegion e, long timeout, TimeUnit unit)
throws InterruptedException {
return offer(e, null, timeout, unit);
return offer(e, e.getCompactPriority(), timeout, unit);
}
@Override

View File

@ -95,6 +95,7 @@ public class Store implements HeapSize {
/* how many bytes to write between status checks */
static int closeCheckInterval = 0;
private final long desiredMaxFileSize;
private final int blockingStoreFileCount;
private volatile long storeSize = 0L;
private final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@ -186,6 +187,8 @@ public class Store implements HeapSize {
HConstants.DEFAULT_MAX_FILE_SIZE);
}
this.desiredMaxFileSize = maxFileSize;
this.blockingStoreFileCount =
conf.getInt("hbase.hstore.blockingStoreFiles", -1);
this.majorCompactionTime =
conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 86400000);
@ -1331,6 +1334,13 @@ public class Store implements HeapSize {
}
return size;
}
/**
* @return The priority that this store should have in the compaction queue
*/
int getCompactPriority() {
return this.blockingStoreFileCount - this.storefiles.size();
}
/**
* Datastructure that holds size and row to split a file around.

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -76,7 +75,7 @@ public class TestPriorityCompactionQueue {
}
}
protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) {
protected void addRegion(PriorityCompactionQueue pq, HRegion r, int p) {
pq.add(r, p);
try {
// Sleep 1 millisecond so 2 things are not put in the queue within the
@ -105,11 +104,11 @@ public class TestPriorityCompactionQueue {
// test 1
// check fifo w/priority
addRegion(pq, r1, Priority.HIGH_BLOCKING);
addRegion(pq, r2, Priority.HIGH_BLOCKING);
addRegion(pq, r3, Priority.HIGH_BLOCKING);
addRegion(pq, r4, Priority.HIGH_BLOCKING);
addRegion(pq, r5, Priority.HIGH_BLOCKING);
addRegion(pq, r1, CompactSplitThread.PRIORITY_BLOCKING);
addRegion(pq, r2, CompactSplitThread.PRIORITY_BLOCKING);
addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING);
addRegion(pq, r4, CompactSplitThread.PRIORITY_BLOCKING);
addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r2);
@ -118,55 +117,41 @@ public class TestPriorityCompactionQueue {
getAndCheckRegion(pq, r5);
// test 2
// check fifo
addRegion(pq, r1, null);
addRegion(pq, r2, null);
addRegion(pq, r3, null);
addRegion(pq, r4, null);
addRegion(pq, r5, null);
// check fifo w/mixed priority
addRegion(pq, r1, CompactSplitThread.PRIORITY_BLOCKING);
addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r4);
getAndCheckRegion(pq, r5);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r4);
// test 3
// check fifo w/mixed priority
addRegion(pq, r1, Priority.HIGH_BLOCKING);
addRegion(pq, r2, Priority.NORMAL);
addRegion(pq, r3, Priority.HIGH_BLOCKING);
addRegion(pq, r4, Priority.NORMAL);
addRegion(pq, r5, Priority.HIGH_BLOCKING);
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r5);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r4);
// test 4
// check fifo w/mixed priority
addRegion(pq, r1, Priority.NORMAL);
addRegion(pq, r2, Priority.NORMAL);
addRegion(pq, r3, Priority.NORMAL);
addRegion(pq, r4, Priority.NORMAL);
addRegion(pq, r5, Priority.HIGH_BLOCKING);
getAndCheckRegion(pq, r5);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r4);
// test 5
// check fifo w/mixed priority elevation time
addRegion(pq, r1, Priority.NORMAL);
addRegion(pq, r2, Priority.HIGH_BLOCKING);
addRegion(pq, r3, Priority.NORMAL);
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, CompactSplitThread.PRIORITY_BLOCKING);
addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
Thread.sleep(1000);
addRegion(pq, r4, Priority.NORMAL);
addRegion(pq, r5, Priority.HIGH_BLOCKING);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r5);
@ -177,15 +162,15 @@ public class TestPriorityCompactionQueue {
// reset the priority compaction queue back to a normal queue
pq = new PriorityCompactionQueue();
// test 7
// test 5
// test that lower priority are removed from the queue when a high priority
// is added
addRegion(pq, r1, Priority.NORMAL);
addRegion(pq, r2, Priority.NORMAL);
addRegion(pq, r3, Priority.NORMAL);
addRegion(pq, r4, Priority.NORMAL);
addRegion(pq, r5, Priority.NORMAL);
addRegion(pq, r3, Priority.HIGH_BLOCKING);
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r1);
@ -195,18 +180,18 @@ public class TestPriorityCompactionQueue {
Assert.assertTrue("Queue should be empty.", pq.size() == 0);
// test 8
// test 6
// don't add the same region more than once
addRegion(pq, r1, Priority.NORMAL);
addRegion(pq, r2, Priority.NORMAL);
addRegion(pq, r3, Priority.NORMAL);
addRegion(pq, r4, Priority.NORMAL);
addRegion(pq, r5, Priority.NORMAL);
addRegion(pq, r1, Priority.NORMAL);
addRegion(pq, r2, Priority.NORMAL);
addRegion(pq, r3, Priority.NORMAL);
addRegion(pq, r4, Priority.NORMAL);
addRegion(pq, r5, Priority.NORMAL);
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, CompactSplitThread.PRIORITY_USER);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r2);
@ -215,5 +200,19 @@ public class TestPriorityCompactionQueue {
getAndCheckRegion(pq, r5);
Assert.assertTrue("Queue should be empty.", pq.size() == 0);
// test 7
// we can handle negative priorities
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, -1);
addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING);
addRegion(pq, r4, -2);
getAndCheckRegion(pq, r4);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r1);
Assert.assertTrue("Queue should be empty.", pq.size() == 0);
}
}