HBASE-2646 Compaction requests should be prioritized to prevent blocking; applied addendum 2464-fix-race-condition-r1004349.txt
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1004371 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
18d7fff188
commit
ea1268b468
|
@ -167,13 +167,23 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
/** Removes the request from the regions in queue
|
||||
* @param p If null it will use the default priority
|
||||
*/
|
||||
protected CompactionRequest removeFromRegionsInQueue(HRegion r) {
|
||||
if (r == null) return null;
|
||||
protected CompactionRequest removeFromRegionsInQueue(CompactionRequest remove) {
|
||||
if (remove == null) return null;
|
||||
|
||||
synchronized (regionsInQueue) {
|
||||
CompactionRequest cr = regionsInQueue.remove(r);
|
||||
CompactionRequest cr = null;
|
||||
cr = regionsInQueue.remove(remove.getHRegion());
|
||||
if (cr != null && !cr.equals(remove))
|
||||
{
|
||||
//Because we don't synchronize across both this.regionsInQueue and this.queue
|
||||
//a rare race condition exists where a higher priority compaction request replaces
|
||||
//the lower priority request in this.regionsInQueue but the lower priority request
|
||||
//is taken off this.queue before the higher can be added to this.queue.
|
||||
//So if we didn't remove what we were expecting we put it back on.
|
||||
regionsInQueue.put(cr.getHRegion(), cr);
|
||||
}
|
||||
if (cr == null) {
|
||||
LOG.warn("Removed a region it couldn't find in regionsInQueue: " + r);
|
||||
LOG.warn("Removed a region it couldn't find in regionsInQueue: " + remove.getHRegion());
|
||||
}
|
||||
return cr;
|
||||
}
|
||||
|
@ -183,7 +193,6 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
CompactionRequest request = this.addToRegionsInQueue(e, p);
|
||||
if (request != null) {
|
||||
boolean result = queue.add(request);
|
||||
queue.peek();
|
||||
return result;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -233,7 +242,7 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
public HRegion take() throws InterruptedException {
|
||||
CompactionRequest cr = queue.take();
|
||||
if (cr != null) {
|
||||
removeFromRegionsInQueue(cr.getHRegion());
|
||||
removeFromRegionsInQueue(cr);
|
||||
return cr.getHRegion();
|
||||
}
|
||||
return null;
|
||||
|
@ -243,7 +252,7 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
CompactionRequest cr = queue.poll(timeout, unit);
|
||||
if (cr != null) {
|
||||
removeFromRegionsInQueue(cr.getHRegion());
|
||||
removeFromRegionsInQueue(cr);
|
||||
return cr.getHRegion();
|
||||
}
|
||||
return null;
|
||||
|
@ -251,8 +260,8 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
|
||||
@Override
|
||||
public boolean remove(Object r) {
|
||||
if (r instanceof HRegion) {
|
||||
CompactionRequest cr = removeFromRegionsInQueue((HRegion) r);
|
||||
if (r instanceof CompactionRequest) {
|
||||
CompactionRequest cr = removeFromRegionsInQueue((CompactionRequest) r);
|
||||
if (cr != null) {
|
||||
return queue.remove(cr);
|
||||
}
|
||||
|
@ -265,7 +274,7 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
public HRegion remove() {
|
||||
CompactionRequest cr = queue.remove();
|
||||
if (cr != null) {
|
||||
removeFromRegionsInQueue(cr.getHRegion());
|
||||
removeFromRegionsInQueue(cr);
|
||||
return cr.getHRegion();
|
||||
}
|
||||
return null;
|
||||
|
@ -275,7 +284,7 @@ public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
|||
public HRegion poll() {
|
||||
CompactionRequest cr = queue.poll();
|
||||
if (cr != null) {
|
||||
removeFromRegionsInQueue(cr.getHRegion());
|
||||
removeFromRegionsInQueue(cr);
|
||||
return cr.getHRegion();
|
||||
}
|
||||
return null;
|
||||
|
|
|
@ -79,7 +79,7 @@ public class TestPriorityCompactionQueue {
|
|||
protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) {
|
||||
pq.add(r, p);
|
||||
try {
|
||||
// Sleep 10 millisecond so 2 things are not put in the queue within the
|
||||
// Sleep 1 millisecond so 2 things are not put in the queue within the
|
||||
// same millisecond. The queue breaks ties arbitrarily between two
|
||||
// requests inserted at the same time. We want the ordering to
|
||||
// be consistent for our unit test.
|
||||
|
|
Loading…
Reference in New Issue