HADOOP-13290. Appropriate use of generics in FairCallQueue. Contributed by Jonathan Hung.

(cherry picked from commit 728bf7f6988e13f17f03d2e3a4037b7238c4fd45)
(cherry picked from commit b95f1af8a9aafb3e482c7b8825f2097f2ceb0e8e)
(cherry picked from commit cbd885b6fa26b7a84a4a19d6084d95665f21378a)
(cherry picked from commit 45ed97e0d2a5a842842c3738f0413345aa633184)
This commit is contained in:
Zhe Zhang 2016-07-13 16:37:40 -07:00
parent 9684b659ff
commit fc335e5297
2 changed files with 28 additions and 8 deletions

View File

@ -313,7 +313,7 @@ public E peek() {
@Override @Override
public int size() { public int size() {
int size = 0; int size = 0;
for (BlockingQueue q : this.queues) { for (BlockingQueue<E> q : this.queues) {
size += q.size(); size += q.size();
} }
return size; return size;
@ -359,7 +359,7 @@ public int drainTo(Collection<? super E> c) {
@Override @Override
public int remainingCapacity() { public int remainingCapacity() {
int sum = 0; int sum = 0;
for (BlockingQueue q : this.queues) { for (BlockingQueue<E> q : this.queues) {
sum += q.remainingCapacity(); sum += q.remainingCapacity();
} }
return sum; return sum;
@ -375,7 +375,7 @@ private static final class MetricsProxy implements FairCallQueueMXBean {
new HashMap<String, MetricsProxy>(); new HashMap<String, MetricsProxy>();
// Weakref for delegate, so we don't retain it forever if it can be GC'd // Weakref for delegate, so we don't retain it forever if it can be GC'd
private WeakReference<FairCallQueue> delegate; private WeakReference<FairCallQueue<? extends Schedulable>> delegate;
// Keep track of how many objects we registered // Keep track of how many objects we registered
private int revisionNumber = 0; private int revisionNumber = 0;
@ -394,14 +394,15 @@ public static synchronized MetricsProxy getInstance(String namespace) {
return mp; return mp;
} }
public void setDelegate(FairCallQueue obj) { public void setDelegate(FairCallQueue<? extends Schedulable> obj) {
this.delegate = new WeakReference<FairCallQueue>(obj); this.delegate
= new WeakReference<FairCallQueue<? extends Schedulable>>(obj);
this.revisionNumber++; this.revisionNumber++;
} }
@Override @Override
public int[] getQueueSizes() { public int[] getQueueSizes() {
FairCallQueue obj = this.delegate.get(); FairCallQueue<? extends Schedulable> obj = this.delegate.get();
if (obj == null) { if (obj == null) {
return new int[]{}; return new int[]{};
} }
@ -411,7 +412,7 @@ public int[] getQueueSizes() {
@Override @Override
public long[] getOverflowedCalls() { public long[] getOverflowedCalls() {
FairCallQueue obj = this.delegate.get(); FairCallQueue<? extends Schedulable> obj = this.delegate.get();
if (obj == null) { if (obj == null) {
return new long[]{}; return new long[]{};
} }

View File

@ -23,6 +23,9 @@
import junit.framework.TestCase; import junit.framework.TestCase;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -394,4 +397,20 @@ public void testTakeTriesNextQueue() throws InterruptedException {
assertEquals(call, fcq.take()); assertEquals(call, fcq.take());
assertEquals(0, fcq.size()); assertEquals(0, fcq.size());
} }
public void testFairCallQueueMXBean() throws Exception {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
"Hadoop:service=ns,name=FairCallQueue");
Schedulable call = mockCall("c");
fcq.put(call);
int[] queueSizes = (int[]) mbs.getAttribute(mxbeanName, "QueueSizes");
assertEquals(1, queueSizes[0]);
assertEquals(0, queueSizes[1]);
fcq.take();
queueSizes = (int[]) mbs.getAttribute(mxbeanName, "QueueSizes");
assertEquals(0, queueSizes[0]);
assertEquals(0, queueSizes[1]);
}
} }