HBASE-3796 Per-Store Entries in Compaction Queue
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1098021 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
15edb7b8e4
commit
47fe311acd
|
@ -194,6 +194,7 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-3812 Tidy up naming consistency and documentation in coprocessor
|
||||
framework (Mingjie Lai)
|
||||
HBASE-1512 Support aggregate functions (Himanshu Vashishtha)
|
||||
HBASE-3796 Per-Store Enties in Compaction Queue
|
||||
|
||||
TASKS
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
|
@ -70,15 +72,17 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
|
|||
@Override
|
||||
public void run() {
|
||||
while (!this.server.isStopped()) {
|
||||
CompactionRequest compactionRequest = null;
|
||||
HRegion r = null;
|
||||
try {
|
||||
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
|
||||
if (r != null) {
|
||||
compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
|
||||
if (compactionRequest != null) {
|
||||
lock.lock();
|
||||
try {
|
||||
if(!this.server.isStopped()) {
|
||||
// Don't interrupt us while we are working
|
||||
byte [] midKey = r.compactStores();
|
||||
r = compactionRequest.getHRegion();
|
||||
byte [] midKey = r.compactStore(compactionRequest.getStore());
|
||||
if (r.getLastCompactInfo() != null) { // compaction aborted?
|
||||
this.server.getMetrics().addCompaction(r.getLastCompactInfo());
|
||||
}
|
||||
|
@ -115,7 +119,9 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
|
|||
|
||||
public synchronized void requestCompaction(final HRegion r,
|
||||
final String why) {
|
||||
requestCompaction(r, false, why, r.getCompactPriority());
|
||||
for(Store s : r.getStores().values()) {
|
||||
requestCompaction(r, s, false, why, s.getCompactPriority());
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void requestCompaction(final HRegion r,
|
||||
|
@ -123,23 +129,33 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
|
|||
requestCompaction(r, false, why, p);
|
||||
}
|
||||
|
||||
public synchronized void requestCompaction(final HRegion r,
|
||||
final boolean force, final String why, int p) {
|
||||
for(Store s : r.getStores().values()) {
|
||||
requestCompaction(r, s, force, why, p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param r HRegion store belongs to
|
||||
* @param force Whether next compaction should be major
|
||||
* @param why Why compaction requested -- used in debug messages
|
||||
*/
|
||||
public synchronized void requestCompaction(final HRegion r,
|
||||
public synchronized void requestCompaction(final HRegion r, final Store s,
|
||||
final boolean force, final String why, int priority) {
|
||||
if (this.server.isStopped()) {
|
||||
return;
|
||||
}
|
||||
// tell the region to major-compact (and don't downgrade it)
|
||||
if (force) {
|
||||
r.setForceMajorCompaction(force);
|
||||
s.setForceMajorCompaction(force);
|
||||
}
|
||||
if (compactionQueue.add(r, priority) && LOG.isDebugEnabled()) {
|
||||
CompactionRequest compactionRequest = new CompactionRequest(r, s, priority);
|
||||
if (compactionQueue.add(compactionRequest) && LOG.isDebugEnabled()) {
|
||||
LOG.debug("Compaction " + (force? "(major) ": "") +
|
||||
"requested for " + r.getRegionNameAsString() +
|
||||
"requested for region " + r.getRegionNameAsString() +
|
||||
"/" + r.getRegionInfo().getEncodedName() +
|
||||
", store " + s +
|
||||
(why != null && !why.isEmpty()? " because " + why: "") +
|
||||
"; priority=" + priority + ", compaction queue size=" + compactionQueue.size());
|
||||
}
|
||||
|
|
|
@ -788,6 +788,20 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return compactStores();
|
||||
}
|
||||
|
||||
/**
|
||||
* Compact all the stores and return the split key of the first store that needs
|
||||
* to be split.
|
||||
*/
|
||||
public byte[] compactStores() throws IOException {
|
||||
byte[] splitRow = null;
|
||||
for(Store s : getStores().values()) {
|
||||
if(splitRow == null) {
|
||||
splitRow = compactStore(s);
|
||||
}
|
||||
}
|
||||
return splitRow;
|
||||
}
|
||||
|
||||
/*
|
||||
* Called by compaction thread and after region is opened to compact the
|
||||
* HStores if necessary.
|
||||
|
@ -802,7 +816,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @return split row if split is needed
|
||||
* @throws IOException e
|
||||
*/
|
||||
public byte [] compactStores() throws IOException {
|
||||
public byte [] compactStore(Store store) throws IOException {
|
||||
if (this.closing.get()) {
|
||||
LOG.debug("Skipping compaction on " + this + " because closing");
|
||||
return null;
|
||||
|
@ -836,17 +850,13 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
long startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
doRegionCompactionPrep();
|
||||
long lastCompactSize = 0;
|
||||
long maxSize = -1;
|
||||
boolean completed = false;
|
||||
try {
|
||||
for (Store store: stores.values()) {
|
||||
final Store.StoreSize ss = store.compact();
|
||||
lastCompactSize += store.getLastCompactSize();
|
||||
if (ss != null && ss.getSize() > maxSize) {
|
||||
maxSize = ss.getSize();
|
||||
if (ss != null) {
|
||||
splitRow = ss.getSplitRow();
|
||||
}
|
||||
}
|
||||
completed = true;
|
||||
} catch (InterruptedIOException iioe) {
|
||||
LOG.info("compaction interrupted by user: ", iioe);
|
||||
|
@ -2224,6 +2234,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return this.stores.get(column);
|
||||
}
|
||||
|
||||
public Map<byte[], Store> getStores() {
|
||||
return this.stores;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Support code
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2403,12 +2417,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (!(o instanceof HRegion)) {
|
||||
return false;
|
||||
}
|
||||
return Bytes.equals(this.regionInfo.getRegionName(), ((HRegion)o).regionInfo.getRegionName());
|
||||
return Bytes.equals(this.getRegionName(), ((HRegion) o).getRegionName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Bytes.hashCode(this.regionInfo.getRegionName());
|
||||
return Bytes.hashCode(this.getRegionName());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,132 +20,69 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* This class delegates to the BlockingQueue but wraps all HRegions in
|
||||
* This class delegates to the BlockingQueue but wraps all Stores in
|
||||
* compaction requests that hold the priority and the date requested.
|
||||
*
|
||||
* Implementation Note: With an elevation time of -1 there is the potential for
|
||||
* starvation of the lower priority compaction requests as long as there is a
|
||||
* constant stream of high priority requests.
|
||||
*/
|
||||
public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
||||
public class PriorityCompactionQueue implements BlockingQueue<CompactionRequest> {
|
||||
static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class);
|
||||
|
||||
/**
|
||||
* This class represents a compaction request and holds the region, priority,
|
||||
* and time submitted.
|
||||
*/
|
||||
private class CompactionRequest implements Comparable<CompactionRequest> {
|
||||
private final HRegion r;
|
||||
private final int p;
|
||||
private final Date date;
|
||||
|
||||
public CompactionRequest(HRegion r, int p) {
|
||||
this(r, p, null);
|
||||
}
|
||||
|
||||
public CompactionRequest(HRegion r, int p, Date d) {
|
||||
if (r == null) {
|
||||
throw new NullPointerException("HRegion cannot be null");
|
||||
}
|
||||
|
||||
if (d == null) {
|
||||
d = new Date();
|
||||
}
|
||||
|
||||
this.r = r;
|
||||
this.p = p;
|
||||
this.date = d;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function will define where in the priority queue the request will
|
||||
* end up. Those with the highest priorities will be first. When the
|
||||
* priorities are the same it will It will first compare priority then date
|
||||
* to maintain a FIFO functionality.
|
||||
*
|
||||
* <p>Note: The date is only accurate to the millisecond which means it is
|
||||
* possible that two requests were inserted into the queue within a
|
||||
* millisecond. When that is the case this function will break the tie
|
||||
* arbitrarily.
|
||||
*/
|
||||
@Override
|
||||
public int compareTo(CompactionRequest request) {
|
||||
//NOTE: The head of the priority queue is the least element
|
||||
if (this.equals(request)) {
|
||||
return 0; //they are the same request
|
||||
}
|
||||
int compareVal;
|
||||
|
||||
compareVal = p - request.p; //compare priority
|
||||
if (compareVal != 0) {
|
||||
return compareVal;
|
||||
}
|
||||
|
||||
compareVal = date.compareTo(request.date);
|
||||
if (compareVal != 0) {
|
||||
return compareVal;
|
||||
}
|
||||
|
||||
//break the tie arbitrarily
|
||||
return -1;
|
||||
}
|
||||
|
||||
/** Gets the HRegion for the request */
|
||||
HRegion getHRegion() {
|
||||
return r;
|
||||
}
|
||||
|
||||
/** Gets the priority for the request */
|
||||
int getPriority() {
|
||||
return p;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "regionName=" + r.getRegionNameAsString() +
|
||||
", priority=" + p + ", date=" + date;
|
||||
}
|
||||
}
|
||||
|
||||
/** The actual blocking queue we delegate to */
|
||||
protected final BlockingQueue<CompactionRequest> queue =
|
||||
new PriorityBlockingQueue<CompactionRequest>();
|
||||
|
||||
/** Hash map of the HRegions contained within the Compaction Queue */
|
||||
private final HashMap<HRegion, CompactionRequest> regionsInQueue =
|
||||
new HashMap<HRegion, CompactionRequest>();
|
||||
/** Hash map of the Stores contained within the Compaction Queue */
|
||||
private final HashMap<Pair<HRegion, Store>, CompactionRequest> storesInQueue =
|
||||
new HashMap<Pair<HRegion, Store>, CompactionRequest>();
|
||||
|
||||
/** Creates a new PriorityCompactionQueue with no priority elevation time */
|
||||
public PriorityCompactionQueue() {
|
||||
LOG.debug("Create PriorityCompactionQueue");
|
||||
}
|
||||
|
||||
/** If the region is not already in the queue it will add it and return a
|
||||
protected Pair<HRegion, Store> toPair(CompactionRequest cr) {
|
||||
return Pair.newPair(cr.getHRegion(), cr.getStore());
|
||||
}
|
||||
|
||||
/** If the store is not already in the queue it will add it and return a
|
||||
* new compaction request object. If it is already present in the queue
|
||||
* then it will return null.
|
||||
* @param p If null it will use the default priority
|
||||
* @return returns a compaction request if it isn't already in the queue
|
||||
*/
|
||||
protected CompactionRequest addToRegionsInQueue(HRegion r, int p) {
|
||||
protected CompactionRequest addToCompactionQueue(CompactionRequest newRequest) {
|
||||
CompactionRequest queuedRequest = null;
|
||||
CompactionRequest newRequest = new CompactionRequest(r, p);
|
||||
synchronized (regionsInQueue) {
|
||||
queuedRequest = regionsInQueue.get(r);
|
||||
synchronized (storesInQueue) {
|
||||
queuedRequest = storesInQueue.get(toPair(newRequest));
|
||||
if (queuedRequest == null ||
|
||||
newRequest.getPriority() < queuedRequest.getPriority()) {
|
||||
LOG.trace("Inserting region in queue. " + newRequest);
|
||||
regionsInQueue.put(r, newRequest);
|
||||
String reason = "";
|
||||
if (queuedRequest != null) {
|
||||
if (newRequest.getPriority() < queuedRequest.getPriority()) {
|
||||
reason = "Reason : priority changed from " +
|
||||
queuedRequest.getPriority() + " to " +
|
||||
newRequest.getPriority() + ". ";
|
||||
}
|
||||
}
|
||||
LOG.debug("Inserting store in queue. " + reason + newRequest);
|
||||
storesInQueue.put(toPair(newRequest), newRequest);
|
||||
} else {
|
||||
LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest +
|
||||
LOG.debug("Store already in queue, skipping. Queued: " + queuedRequest +
|
||||
", requested: " + newRequest);
|
||||
newRequest = null; // It is already present so don't add it
|
||||
}
|
||||
|
@ -159,33 +96,34 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
return newRequest;
|
||||
}
|
||||
|
||||
/** Removes the request from the regions in queue
|
||||
/** Removes the request from the stores in queue
|
||||
* @param remove
|
||||
*/
|
||||
protected CompactionRequest removeFromRegionsInQueue(CompactionRequest remove) {
|
||||
if (remove == null) return null;
|
||||
protected CompactionRequest removeFromQueue(CompactionRequest c) {
|
||||
if (c == null) return null;
|
||||
|
||||
synchronized (regionsInQueue) {
|
||||
CompactionRequest cr = null;
|
||||
cr = regionsInQueue.remove(remove.getHRegion());
|
||||
if (cr != null && !cr.equals(remove))
|
||||
synchronized (storesInQueue) {
|
||||
CompactionRequest cr = storesInQueue.remove(toPair(c));
|
||||
if (cr != null && !cr.equals(c))
|
||||
{
|
||||
//Because we don't synchronize across both this.regionsInQueue and this.queue
|
||||
//a rare race condition exists where a higher priority compaction request replaces
|
||||
//the lower priority request in this.regionsInQueue but the lower priority request
|
||||
//is taken off this.queue before the higher can be added to this.queue.
|
||||
//So if we didn't remove what we were expecting we put it back on.
|
||||
regionsInQueue.put(cr.getHRegion(), cr);
|
||||
storesInQueue.put(toPair(cr), cr);
|
||||
}
|
||||
if (cr == null) {
|
||||
LOG.warn("Removed a region it couldn't find in regionsInQueue: " + remove.getHRegion());
|
||||
LOG.warn("Removed a compaction request it couldn't find in storesInQueue: " +
|
||||
"region = " + c.getHRegion() + ", store = " + c.getStore());
|
||||
}
|
||||
return cr;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean add(HRegion e, int p) {
|
||||
CompactionRequest request = this.addToRegionsInQueue(e, p);
|
||||
@Override
|
||||
public boolean add(CompactionRequest e) {
|
||||
CompactionRequest request = this.addToCompactionQueue(e);
|
||||
if (request != null) {
|
||||
boolean result = queue.add(request);
|
||||
return result;
|
||||
|
@ -195,68 +133,50 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean add(HRegion e) {
|
||||
return add(e, e.getCompactPriority());
|
||||
}
|
||||
|
||||
public boolean offer(HRegion e, int p) {
|
||||
CompactionRequest request = this.addToRegionsInQueue(e, p);
|
||||
public boolean offer(CompactionRequest e) {
|
||||
CompactionRequest request = this.addToCompactionQueue(e);
|
||||
return (request != null)? queue.offer(request): false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean offer(HRegion e) {
|
||||
return offer(e, e.getCompactPriority());
|
||||
}
|
||||
|
||||
public void put(HRegion e, int p) throws InterruptedException {
|
||||
CompactionRequest request = this.addToRegionsInQueue(e, p);
|
||||
public void put(CompactionRequest e) throws InterruptedException {
|
||||
CompactionRequest request = this.addToCompactionQueue(e);
|
||||
if (request != null) {
|
||||
queue.put(request);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(HRegion e) throws InterruptedException {
|
||||
put(e, e.getCompactPriority());
|
||||
}
|
||||
|
||||
public boolean offer(HRegion e, int p, long timeout, TimeUnit unit)
|
||||
public boolean offer(CompactionRequest e, long timeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
CompactionRequest request = this.addToRegionsInQueue(e, p);
|
||||
CompactionRequest request = this.addToCompactionQueue(e);
|
||||
return (request != null)? queue.offer(request, timeout, unit): false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean offer(HRegion e, long timeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
return offer(e, e.getCompactPriority(), timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegion take() throws InterruptedException {
|
||||
public CompactionRequest take() throws InterruptedException {
|
||||
CompactionRequest cr = queue.take();
|
||||
if (cr != null) {
|
||||
removeFromRegionsInQueue(cr);
|
||||
return cr.getHRegion();
|
||||
removeFromQueue(cr);
|
||||
return cr;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
public CompactionRequest poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
CompactionRequest cr = queue.poll(timeout, unit);
|
||||
if (cr != null) {
|
||||
removeFromRegionsInQueue(cr);
|
||||
return cr.getHRegion();
|
||||
removeFromQueue(cr);
|
||||
return cr;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Object r) {
|
||||
if (r instanceof CompactionRequest) {
|
||||
CompactionRequest cr = removeFromRegionsInQueue((CompactionRequest) r);
|
||||
public boolean remove(Object o) {
|
||||
if (o instanceof CompactionRequest) {
|
||||
CompactionRequest cr = removeFromQueue((CompactionRequest) o);
|
||||
if (cr != null) {
|
||||
return queue.remove(cr);
|
||||
}
|
||||
|
@ -266,21 +186,21 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public HRegion remove() {
|
||||
public CompactionRequest remove() {
|
||||
CompactionRequest cr = queue.remove();
|
||||
if (cr != null) {
|
||||
removeFromRegionsInQueue(cr);
|
||||
return cr.getHRegion();
|
||||
removeFromQueue(cr);
|
||||
return cr;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegion poll() {
|
||||
public CompactionRequest poll() {
|
||||
CompactionRequest cr = queue.poll();
|
||||
if (cr != null) {
|
||||
removeFromRegionsInQueue(cr);
|
||||
return cr.getHRegion();
|
||||
removeFromQueue(cr);
|
||||
return cr;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -292,9 +212,9 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
|
||||
@Override
|
||||
public boolean contains(Object r) {
|
||||
if (r instanceof HRegion) {
|
||||
synchronized (regionsInQueue) {
|
||||
return regionsInQueue.containsKey((HRegion) r);
|
||||
if (r instanceof CompactionRequest) {
|
||||
synchronized (storesInQueue) {
|
||||
return storesInQueue.containsKey(toPair((CompactionRequest) r));
|
||||
}
|
||||
} else if (r instanceof CompactionRequest) {
|
||||
return queue.contains(r);
|
||||
|
@ -303,15 +223,15 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public HRegion element() {
|
||||
public CompactionRequest element() {
|
||||
CompactionRequest cr = queue.element();
|
||||
return (cr != null)? cr.getHRegion(): null;
|
||||
return (cr != null)? cr: null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegion peek() {
|
||||
public CompactionRequest peek() {
|
||||
CompactionRequest cr = queue.peek();
|
||||
return (cr != null)? cr.getHRegion(): null;
|
||||
return (cr != null)? cr: null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -326,14 +246,14 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
|
||||
@Override
|
||||
public void clear() {
|
||||
regionsInQueue.clear();
|
||||
storesInQueue.clear();
|
||||
queue.clear();
|
||||
}
|
||||
|
||||
// Unimplemented methods, collection methods
|
||||
|
||||
@Override
|
||||
public Iterator<HRegion> iterator() {
|
||||
public Iterator<CompactionRequest> iterator() {
|
||||
throw new UnsupportedOperationException("Not supported.");
|
||||
}
|
||||
|
||||
|
@ -353,7 +273,7 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(Collection<? extends HRegion> c) {
|
||||
public boolean addAll(Collection<? extends CompactionRequest> c) {
|
||||
throw new UnsupportedOperationException("Not supported.");
|
||||
}
|
||||
|
||||
|
@ -368,12 +288,12 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int drainTo(Collection<? super HRegion> c) {
|
||||
public int drainTo(Collection<? super CompactionRequest> c) {
|
||||
throw new UnsupportedOperationException("Not supported.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int drainTo(Collection<? super HRegion> c, int maxElements) {
|
||||
public int drainTo(Collection<? super CompactionRequest> c, int maxElements) {
|
||||
throw new UnsupportedOperationException("Not supported.");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1479,7 +1479,7 @@ public class Store implements HeapSize {
|
|||
/**
|
||||
* @return The priority that this store should have in the compaction queue
|
||||
*/
|
||||
int getCompactPriority() {
|
||||
public int getCompactPriority() {
|
||||
return this.blockingStoreFileCount - this.storefiles.size();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
|
||||
/**
|
||||
* This class represents a compaction request and holds the region, priority,
|
||||
* and time submitted.
|
||||
*/
|
||||
public class CompactionRequest implements Comparable<CompactionRequest> {
|
||||
static final Log LOG = LogFactory.getLog(CompactionRequest.class);
|
||||
private final HRegion r;
|
||||
private final Store s;
|
||||
private int p;
|
||||
private final Date date;
|
||||
|
||||
public CompactionRequest(HRegion r, Store s) {
|
||||
this(r, s, s.getCompactPriority());
|
||||
}
|
||||
|
||||
public CompactionRequest(HRegion r, Store s, int p) {
|
||||
this(r, s, p, null);
|
||||
}
|
||||
|
||||
public CompactionRequest(HRegion r, Store s, int p, Date d) {
|
||||
if (r == null) {
|
||||
throw new NullPointerException("HRegion cannot be null");
|
||||
}
|
||||
|
||||
if (d == null) {
|
||||
d = new Date();
|
||||
}
|
||||
|
||||
this.r = r;
|
||||
this.s = s;
|
||||
this.p = p;
|
||||
this.date = d;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function will define where in the priority queue the request will
|
||||
* end up. Those with the highest priorities will be first. When the
|
||||
* priorities are the same it will It will first compare priority then date
|
||||
* to maintain a FIFO functionality.
|
||||
*
|
||||
* <p>Note: The date is only accurate to the millisecond which means it is
|
||||
* possible that two requests were inserted into the queue within a
|
||||
* millisecond. When that is the case this function will break the tie
|
||||
* arbitrarily.
|
||||
*/
|
||||
@Override
|
||||
public int compareTo(CompactionRequest request) {
|
||||
//NOTE: The head of the priority queue is the least element
|
||||
if (this.equals(request)) {
|
||||
return 0; //they are the same request
|
||||
}
|
||||
int compareVal;
|
||||
|
||||
compareVal = p - request.p; //compare priority
|
||||
if (compareVal != 0) {
|
||||
return compareVal;
|
||||
}
|
||||
|
||||
compareVal = date.compareTo(request.date);
|
||||
if (compareVal != 0) {
|
||||
return compareVal;
|
||||
}
|
||||
|
||||
//break the tie arbitrarily
|
||||
return -1;
|
||||
}
|
||||
|
||||
/** Gets the HRegion for the request */
|
||||
public HRegion getHRegion() {
|
||||
return r;
|
||||
}
|
||||
|
||||
/** Gets the Store for the request */
|
||||
public Store getStore() {
|
||||
return s;
|
||||
}
|
||||
|
||||
/** Gets the priority for the request */
|
||||
public int getPriority() {
|
||||
return p;
|
||||
}
|
||||
|
||||
/** Gets the priority for the request */
|
||||
public void setPriority(int p) {
|
||||
this.p = p;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "regionName=" + r.getRegionNameAsString() +
|
||||
((s == null) ? ""
|
||||
: "storeName = " + new String(s.getFamily().getName())) +
|
||||
", priority=" + p + ", date=" + date;
|
||||
}
|
||||
}
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -61,6 +63,10 @@ public class TestPriorityCompactionQueue {
|
|||
return "[DummyHRegion " + name + "]";
|
||||
}
|
||||
|
||||
public byte[] getRegionName() {
|
||||
return Bytes.toBytes(name);
|
||||
}
|
||||
|
||||
public String getRegionNameAsString() {
|
||||
return name;
|
||||
}
|
||||
|
@ -68,7 +74,7 @@ public class TestPriorityCompactionQueue {
|
|||
|
||||
protected void getAndCheckRegion(PriorityCompactionQueue pq,
|
||||
HRegion checkRegion) {
|
||||
HRegion r = pq.remove();
|
||||
HRegion r = pq.remove().getHRegion();
|
||||
if (r != checkRegion) {
|
||||
Assert.assertTrue("Didn't get expected " + checkRegion + " got " + r, r
|
||||
.equals(checkRegion));
|
||||
|
@ -76,7 +82,7 @@ public class TestPriorityCompactionQueue {
|
|||
}
|
||||
|
||||
protected void addRegion(PriorityCompactionQueue pq, HRegion r, int p) {
|
||||
pq.add(r, p);
|
||||
pq.add(new CompactionRequest(r, null, p));
|
||||
try {
|
||||
// Sleep 1 millisecond so 2 things are not put in the queue within the
|
||||
// same millisecond. The queue breaks ties arbitrarily between two
|
||||
|
|
Loading…
Reference in New Issue