From f4ac670a42adbe42183a07d6cb486d39fbb4eac3 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 28 Sep 2010 05:26:22 +0000 Subject: [PATCH] HBASE-2646 Compaction requests should be prioritized to prevent blocking git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1002019 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../regionserver/CompactSplitThread.java | 63 ++- .../hbase/regionserver/MemStoreFlusher.java | 3 +- .../regionserver/PriorityCompactionQueue.java | 375 ++++++++++++++++++ .../wal/SequenceFileLogReader.java | 43 +- .../TestPriorityCompactionQueue.java | 219 ++++++++++ 6 files changed, 676 insertions(+), 29 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java create mode 100644 src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java diff --git a/CHANGES.txt b/CHANGES.txt index c84a00c7741..79b3734843f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -947,6 +947,8 @@ Release 0.21.0 - Unreleased (Andy Chen via Stack) HBASE-3030 The return code of many filesystem operations are not checked (dhruba borthakur via Stack) + HBASE-2646 Compaction requests should be prioritized to prevent blocking + (Jeff Whiting via Stack) NEW FEATURES HBASE-1961 HBase EC2 scripts diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index c27ba7a8d1a..95c3b9eeae4 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -20,9 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.HashSet; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -43,10 +40,36 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { private final HRegionServer server; private final Configuration conf; - private final BlockingQueue compactionQueue = - new LinkedBlockingQueue(); + private final PriorityCompactionQueue compactionQueue = + new PriorityCompactionQueue(); - private final HashSet regionsInQueue = new HashSet(); + /** The priorities for a compaction request. */ + public enum Priority implements Comparable { + //NOTE: All priorities should be numbered consecutively starting with 1. + //The highest priority should be 1 followed by all lower priorities. + //Priorities can be changed at anytime without requiring any changes to the + //queue. + + /** HIGH_BLOCKING should only be used when an operation is blocked until a + * compact / split is done (e.g. a MemStore can't flush because it has + * "too many store files" and is blocking until a compact / split is done) + */ + HIGH_BLOCKING(1), + /** A normal compaction / split request */ + NORMAL(2), + /** A low compaction / split request -- not currently used */ + LOW(3); + + int value; + + Priority(int value) { + this.value = value; + } + + int getInt() { + return value; + } + } /** * Splitting should not take place if the total number of regions exceed this. @@ -74,9 +97,6 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { try { r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); if (r != null && !this.server.isStopped()) { - synchronized (regionsInQueue) { - regionsInQueue.remove(r); - } lock.lock(); try { // Don't interrupt us while we are working @@ -107,14 +127,23 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { } } } - regionsInQueue.clear(); compactionQueue.clear(); LOG.info(getName() + " exiting"); } public synchronized void requestCompaction(final HRegion r, final String why) { - requestCompaction(r, false, why); + requestCompaction(r, false, why, Priority.NORMAL); + } + + public synchronized void requestCompaction(final HRegion r, + final String why, Priority p) { + requestCompaction(r, false, why, p); + } + + public synchronized void requestCompaction(final HRegion r, + final boolean force, final String why) { + requestCompaction(r, force, why, Priority.NORMAL); } /** @@ -123,7 +152,7 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { * @param why Why compaction requested -- used in debug messages */ public synchronized void requestCompaction(final HRegion r, - final boolean force, final String why) { + final boolean force, final String why, Priority priority) { if (this.server.isStopped()) { return; } @@ -131,14 +160,10 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { if (LOG.isDebugEnabled()) { LOG.debug("Compaction " + (force? "(major) ": "") + "requested for region " + r.getRegionNameAsString() + - (why != null && !why.isEmpty()? " because: " + why: "")); - } - synchronized (regionsInQueue) { - if (!regionsInQueue.contains(r)) { - compactionQueue.add(r); - regionsInQueue.add(r); - } + (why != null && !why.isEmpty()? " because: " + why: "") + + "; Priority: " + priority + "; Compaction queue size: " + compactionQueue.size()); } + compactionQueue.add(r, priority); } private void split(final HRegion parent, final byte [] midKey) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index b2357e9e734..c773ecb3b72 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -212,7 +212,8 @@ class MemStoreFlusher extends Thread implements FlushRequester { LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); } - this.server.compactSplitThread.requestCompaction(region, getName()); + this.server.compactSplitThread.requestCompaction(region, getName(), + CompactSplitThread.Priority.HIGH_BLOCKING); // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java b/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java new file mode 100644 index 00000000000..7c17529738e --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java @@ -0,0 +1,375 @@ +/** +* Copyright 2010 The Apache Software Foundation +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority; + +/** + * This class delegates to the BlockingQueue but wraps all HRegions in + * compaction requests that hold the priority and the date requested. + * + * Implementation Note: With an elevation time of -1 there is the potential for + * starvation of the lower priority compaction requests as long as there is a + * constant stream of high priority requests. + */ +public class PriorityCompactionQueue implements BlockingQueue { + static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class); + + /** + * This class represents a compaction request and holds the region, priority, + * and time submitted. + */ + private class CompactionRequest implements Comparable { + private final HRegion r; + private final Priority p; + private final Date date; + + public CompactionRequest(HRegion r, Priority p) { + this(r, p, null); + } + + public CompactionRequest(HRegion r, Priority p, Date d) { + if (r == null) { + throw new NullPointerException("HRegion cannot be null"); + } + + if (p == null) { + p = Priority.NORMAL; //the default priority + } + + if (d == null) { + d = new Date(); + } + + this.r = r; + this.p = p; + this.date = d; + } + + /** + * This function will define where in the priority queue the request will + * end up. Those with the highest priorities will be first. When the + * priorities are the same it will It will first compare priority then date + * to maintain a FIFO functionality. + * + *

Note: The date is only accurate to the millisecond which means it is + * possible that two requests were inserted into the queue within a + * millisecond. When that is the case this function will break the tie + * arbitrarily. + */ + @Override + public int compareTo(CompactionRequest request) { + //NOTE: The head of the priority queue is the least element + if (this.equals(request)) { + return 0; //they are the same request + } + int compareVal; + + compareVal = p.compareTo(request.p); //compare priority + if (compareVal != 0) { + return compareVal; + } + + compareVal = date.compareTo(request.date); + if (compareVal != 0) { + return compareVal; + } + + //break the tie arbitrarily + return -1; + } + + /** Gets the HRegion for the request */ + HRegion getHRegion() { + return r; + } + + /** Gets the priority for the request */ + Priority getPriority() { + return p; + } + + public String toString() { + return "regionName=" + r.getRegionNameAsString() + + ", priority=" + p + ", date=" + date; + } + } + + /** The actual blocking queue we delegate to */ + protected final BlockingQueue queue = + new PriorityBlockingQueue(); + + /** Hash map of the HRegions contained within the Compaction Queue */ + private final HashMap regionsInQueue = + new HashMap(); + + /** Creates a new PriorityCompactionQueue with no priority elevation time */ + public PriorityCompactionQueue() { + LOG.debug("Create PriorityCompactionQueue"); + } + + /** If the region is not already in the queue it will add it and return a + * new compaction request object. If it is already present in the queue + * then it will return null. + * @param p If null it will use the default priority + * @return returns a compaction request if it isn't already in the queue + */ + protected CompactionRequest addToRegionsInQueue(HRegion r, Priority p) { + CompactionRequest queuedRequest = null; + CompactionRequest newRequest = new CompactionRequest(r, p); + synchronized (regionsInQueue) { + queuedRequest = regionsInQueue.get(r); + if (queuedRequest == null || + newRequest.getPriority().compareTo(queuedRequest.getPriority()) < 0) { + LOG.trace("Inserting region in queue. " + newRequest); + regionsInQueue.put(r, newRequest); + } else { + LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest + + ", requested: " + newRequest); + newRequest = null; // It is already present so don't add it + } + } + + if (newRequest != null && queuedRequest != null) { + // Remove the lower priority request + queue.remove(queuedRequest); + } + + return newRequest; + } + + /** Removes the request from the regions in queue + * @param p If null it will use the default priority + */ + protected CompactionRequest removeFromRegionsInQueue(HRegion r) { + if (r == null) return null; + + synchronized (regionsInQueue) { + CompactionRequest cr = regionsInQueue.remove(r); + if (cr == null) { + LOG.warn("Removed a region it couldn't find in regionsInQueue: " + r); + } + return cr; + } + } + + public boolean add(HRegion e, Priority p) { + CompactionRequest request = this.addToRegionsInQueue(e, p); + if (request != null) { + boolean result = queue.add(request); + queue.peek(); + return result; + } else { + return false; + } + } + + @Override + public boolean add(HRegion e) { + return add(e, null); + } + + public boolean offer(HRegion e, Priority p) { + CompactionRequest request = this.addToRegionsInQueue(e, p); + return (request != null)? queue.offer(request): false; + } + + @Override + public boolean offer(HRegion e) { + return offer(e, null); + } + + public void put(HRegion e, Priority p) throws InterruptedException { + CompactionRequest request = this.addToRegionsInQueue(e, p); + if (request != null) { + queue.put(request); + } + } + + @Override + public void put(HRegion e) throws InterruptedException { + put(e, null); + } + + public boolean offer(HRegion e, Priority p, long timeout, TimeUnit unit) + throws InterruptedException { + CompactionRequest request = this.addToRegionsInQueue(e, p); + return (request != null)? queue.offer(request, timeout, unit): false; + } + + @Override + public boolean offer(HRegion e, long timeout, TimeUnit unit) + throws InterruptedException { + return offer(e, null, timeout, unit); + } + + @Override + public HRegion take() throws InterruptedException { + CompactionRequest cr = queue.take(); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } + return null; + } + + @Override + public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException { + CompactionRequest cr = queue.poll(timeout, unit); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } + return null; + } + + @Override + public boolean remove(Object r) { + if (r instanceof HRegion) { + CompactionRequest cr = removeFromRegionsInQueue((HRegion) r); + if (cr != null) { + return queue.remove(cr); + } + } + + return false; + } + + @Override + public HRegion remove() { + CompactionRequest cr = queue.remove(); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } + return null; + } + + @Override + public HRegion poll() { + CompactionRequest cr = queue.poll(); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } + return null; + } + + @Override + public int remainingCapacity() { + return queue.remainingCapacity(); + } + + @Override + public boolean contains(Object r) { + if (r instanceof HRegion) { + synchronized (regionsInQueue) { + return regionsInQueue.containsKey((HRegion) r); + } + } else if (r instanceof CompactionRequest) { + return queue.contains(r); + } + return false; + } + + @Override + public HRegion element() { + CompactionRequest cr = queue.element(); + return (cr != null)? cr.getHRegion(): null; + } + + @Override + public HRegion peek() { + CompactionRequest cr = queue.peek(); + return (cr != null)? cr.getHRegion(): null; + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public void clear() { + regionsInQueue.clear(); + queue.clear(); + } + + // Unimplemented methods, collection methods + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public int drainTo(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public int drainTo(Collection c, int maxElements) { + throw new UnsupportedOperationException("Not supported."); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index 4332f9382b8..114a8ef2ddd 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal; -import java.io.EOFException; +import java.io.FilterInputStream; import java.io.IOException; import java.lang.Class; import java.lang.reflect.Constructor; @@ -78,18 +78,43 @@ public class SequenceFileLogReader implements HLog.Reader { this.length = l; } + // This section can be confusing. It is specific to how HDFS works. + // Let me try to break it down. This is the problem: + // + // 1. HDFS DataNodes update the NameNode about a filename's length + // on block boundaries or when a file is closed. Therefore, + // if an RS dies, then the NN's fs.getLength() can be out of date + // 2. this.in.available() would work, but it returns int & + // therefore breaks for files > 2GB (happens on big clusters) + // 3. DFSInputStream.getFileLength() gets the actual length from the DNs + // 4. DFSInputStream is wrapped 2 levels deep : this.in.in + // + // So, here we adjust getPos() using getFileLength() so the + // SequenceFile.Reader constructor (aka: first invocation) comes out + // with the correct end of the file: + // this.end = in.getPos() + length; @Override public long getPos() throws IOException { if (this.firstGetPosInvocation) { this.firstGetPosInvocation = false; - // Tell a lie. We're doing this just so that this line up in - // SequenceFile.Reader constructor comes out with the correct length - // on the file: - // this.end = in.getPos() + length; - long available = this.in.available(); - // Length gets added up in the SF.Reader constructor so subtract the - // difference. If available < this.length, then return this.length. - return available >= this.length? available - this.length: this.length; + long adjust = 0; + + try { + Field fIn = FilterInputStream.class.getDeclaredField("in"); + fIn.setAccessible(true); + Object realIn = fIn.get(this.in); + long realLength = ((Long)realIn.getClass(). + getMethod("getFileLength", new Class []{}). + invoke(realIn, new Object []{})).longValue(); + assert(realLength >= this.length); + adjust = realLength - this.length; + } catch(Exception e) { + SequenceFileLogReader.LOG.warn( + "Error while trying to get accurate file length. " + + "Truncation / data loss may occur if RegionServers die.", e); + } + + return adjust + super.getPos(); } return super.getPos(); } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java new file mode 100644 index 00000000000..557456c2fcd --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java @@ -0,0 +1,219 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for the priority compaction queue + */ +public class TestPriorityCompactionQueue { + static final Log LOG = LogFactory.getLog(TestPriorityCompactionQueue.class); + + @Before + public void setUp() { + } + + @After + public void tearDown() { + + } + + class DummyHRegion extends HRegion { + String name; + + DummyHRegion(String name) { + super(); + this.name = name; + } + + public int hashCode() { + return name.hashCode(); + } + + public boolean equals(DummyHRegion r) { + return name.equals(r.name); + } + + public String toString() { + return "[DummyHRegion " + name + "]"; + } + + public String getRegionNameAsString() { + return name; + } + } + + protected void getAndCheckRegion(PriorityCompactionQueue pq, + HRegion checkRegion) { + HRegion r = pq.remove(); + if (r != checkRegion) { + Assert.assertTrue("Didn't get expected " + checkRegion + " got " + r, r + .equals(checkRegion)); + } + } + + protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) { + pq.add(r, p); + try { + // Sleep 10 millisecond so 2 things are not put in the queue within the + // same millisecond. The queue breaks ties arbitrarily between two + // requests inserted at the same time. We want the ordering to + // be consistent for our unit test. + Thread.sleep(1); + } catch (InterruptedException ex) { + // continue + } + } + + // //////////////////////////////////////////////////////////////////////////// + // tests + // //////////////////////////////////////////////////////////////////////////// + + /** tests general functionality of the compaction queue */ + @Test public void testPriorityQueue() throws InterruptedException { + PriorityCompactionQueue pq = new PriorityCompactionQueue(); + + HRegion r1 = new DummyHRegion("r1"); + HRegion r2 = new DummyHRegion("r2"); + HRegion r3 = new DummyHRegion("r3"); + HRegion r4 = new DummyHRegion("r4"); + HRegion r5 = new DummyHRegion("r5"); + + // test 1 + // check fifo w/priority + addRegion(pq, r1, Priority.HIGH_BLOCKING); + addRegion(pq, r2, Priority.HIGH_BLOCKING); + addRegion(pq, r3, Priority.HIGH_BLOCKING); + addRegion(pq, r4, Priority.HIGH_BLOCKING); + addRegion(pq, r5, Priority.HIGH_BLOCKING); + + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r4); + getAndCheckRegion(pq, r5); + + // test 2 + // check fifo + addRegion(pq, r1, null); + addRegion(pq, r2, null); + addRegion(pq, r3, null); + addRegion(pq, r4, null); + addRegion(pq, r5, null); + + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r4); + getAndCheckRegion(pq, r5); + + // test 3 + // check fifo w/mixed priority + addRegion(pq, r1, Priority.HIGH_BLOCKING); + addRegion(pq, r2, Priority.NORMAL); + addRegion(pq, r3, Priority.HIGH_BLOCKING); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.HIGH_BLOCKING); + + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r5); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r4); + + // test 4 + // check fifo w/mixed priority + addRegion(pq, r1, Priority.NORMAL); + addRegion(pq, r2, Priority.NORMAL); + addRegion(pq, r3, Priority.NORMAL); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.HIGH_BLOCKING); + + getAndCheckRegion(pq, r5); + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r4); + + // test 5 + // check fifo w/mixed priority elevation time + addRegion(pq, r1, Priority.NORMAL); + addRegion(pq, r2, Priority.HIGH_BLOCKING); + addRegion(pq, r3, Priority.NORMAL); + Thread.sleep(1000); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.HIGH_BLOCKING); + + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r5); + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r4); + + // reset the priority compaction queue back to a normal queue + pq = new PriorityCompactionQueue(); + + // test 7 + // test that lower priority are removed from the queue when a high priority + // is added + addRegion(pq, r1, Priority.NORMAL); + addRegion(pq, r2, Priority.NORMAL); + addRegion(pq, r3, Priority.NORMAL); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.NORMAL); + addRegion(pq, r3, Priority.HIGH_BLOCKING); + + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r4); + getAndCheckRegion(pq, r5); + + Assert.assertTrue("Queue should be empty.", pq.size() == 0); + + // test 8 + // don't add the same region more than once + addRegion(pq, r1, Priority.NORMAL); + addRegion(pq, r2, Priority.NORMAL); + addRegion(pq, r3, Priority.NORMAL); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.NORMAL); + addRegion(pq, r1, Priority.NORMAL); + addRegion(pq, r2, Priority.NORMAL); + addRegion(pq, r3, Priority.NORMAL); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.NORMAL); + + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r4); + getAndCheckRegion(pq, r5); + + Assert.assertTrue("Queue should be empty.", pq.size() == 0); + } +} \ No newline at end of file