HADOOP-13290. Appropriate use of generics in FairCallQueue. Contributed by Jonathan Hung.
This commit is contained in:
parent
d18050522c
commit
728bf7f698
|
@ -300,7 +300,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
||||||
@Override
|
@Override
|
||||||
public int size() {
|
public int size() {
|
||||||
int size = 0;
|
int size = 0;
|
||||||
for (BlockingQueue q : this.queues) {
|
for (BlockingQueue<E> q : this.queues) {
|
||||||
size += q.size();
|
size += q.size();
|
||||||
}
|
}
|
||||||
return size;
|
return size;
|
||||||
|
@ -346,7 +346,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
||||||
@Override
|
@Override
|
||||||
public int remainingCapacity() {
|
public int remainingCapacity() {
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
for (BlockingQueue q : this.queues) {
|
for (BlockingQueue<E> q : this.queues) {
|
||||||
sum += q.remainingCapacity();
|
sum += q.remainingCapacity();
|
||||||
}
|
}
|
||||||
return sum;
|
return sum;
|
||||||
|
@ -362,7 +362,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
||||||
new HashMap<String, MetricsProxy>();
|
new HashMap<String, MetricsProxy>();
|
||||||
|
|
||||||
// Weakref for delegate, so we don't retain it forever if it can be GC'd
|
// Weakref for delegate, so we don't retain it forever if it can be GC'd
|
||||||
private WeakReference<FairCallQueue> delegate;
|
private WeakReference<FairCallQueue<? extends Schedulable>> delegate;
|
||||||
|
|
||||||
// Keep track of how many objects we registered
|
// Keep track of how many objects we registered
|
||||||
private int revisionNumber = 0;
|
private int revisionNumber = 0;
|
||||||
|
@ -381,14 +381,15 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
||||||
return mp;
|
return mp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setDelegate(FairCallQueue obj) {
|
public void setDelegate(FairCallQueue<? extends Schedulable> obj) {
|
||||||
this.delegate = new WeakReference<FairCallQueue>(obj);
|
this.delegate
|
||||||
|
= new WeakReference<FairCallQueue<? extends Schedulable>>(obj);
|
||||||
this.revisionNumber++;
|
this.revisionNumber++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int[] getQueueSizes() {
|
public int[] getQueueSizes() {
|
||||||
FairCallQueue obj = this.delegate.get();
|
FairCallQueue<? extends Schedulable> obj = this.delegate.get();
|
||||||
if (obj == null) {
|
if (obj == null) {
|
||||||
return new int[]{};
|
return new int[]{};
|
||||||
}
|
}
|
||||||
|
@ -398,7 +399,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long[] getOverflowedCalls() {
|
public long[] getOverflowedCalls() {
|
||||||
FairCallQueue obj = this.delegate.get();
|
FairCallQueue<? extends Schedulable> obj = this.delegate.get();
|
||||||
if (obj == null) {
|
if (obj == null) {
|
||||||
return new long[]{};
|
return new long[]{};
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,9 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
@ -392,4 +395,20 @@ public class TestFairCallQueue extends TestCase {
|
||||||
assertEquals(call, fcq.take());
|
assertEquals(call, fcq.take());
|
||||||
assertEquals(0, fcq.size());
|
assertEquals(0, fcq.size());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
public void testFairCallQueueMXBean() throws Exception {
|
||||||
|
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
ObjectName mxbeanName = new ObjectName(
|
||||||
|
"Hadoop:service=ns,name=FairCallQueue");
|
||||||
|
|
||||||
|
Schedulable call = mockCall("c");
|
||||||
|
fcq.put(call);
|
||||||
|
int[] queueSizes = (int[]) mbs.getAttribute(mxbeanName, "QueueSizes");
|
||||||
|
assertEquals(1, queueSizes[0]);
|
||||||
|
assertEquals(0, queueSizes[1]);
|
||||||
|
fcq.take();
|
||||||
|
queueSizes = (int[]) mbs.getAttribute(mxbeanName, "QueueSizes");
|
||||||
|
assertEquals(0, queueSizes[0]);
|
||||||
|
assertEquals(0, queueSizes[1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue