HADOOP-11145. TestFairCallQueue fails. Contributed by Akira AJISAKA.
This commit is contained in:
parent
8dc4e9408f
commit
b9158697a4
|
@ -898,6 +898,8 @@ Release 2.6.0 - UNRELEASED
|
||||||
HADOOP-11154. Update BUILDING.txt to state that CMake 3.0 or newer is
|
HADOOP-11154. Update BUILDING.txt to state that CMake 3.0 or newer is
|
||||||
required on Mac. (cnauroth)
|
required on Mac. (cnauroth)
|
||||||
|
|
||||||
|
HADOOP-11145. TestFairCallQueue fails. (Akira AJISAKA via cnauroth)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
|
||||||
|
@ -243,11 +244,14 @@ public class TestFairCallQueue extends TestCase {
|
||||||
public final String tag;
|
public final String tag;
|
||||||
public volatile int callsAdded = 0; // How many calls we added, accurate unless interrupted
|
public volatile int callsAdded = 0; // How many calls we added, accurate unless interrupted
|
||||||
private final int maxCalls;
|
private final int maxCalls;
|
||||||
|
private final CountDownLatch latch;
|
||||||
|
|
||||||
public Putter(BlockingQueue<Schedulable> aCq, int maxCalls, String tag) {
|
public Putter(BlockingQueue<Schedulable> aCq, int maxCalls, String tag,
|
||||||
|
CountDownLatch latch) {
|
||||||
this.maxCalls = maxCalls;
|
this.maxCalls = maxCalls;
|
||||||
this.cq = aCq;
|
this.cq = aCq;
|
||||||
this.tag = tag;
|
this.tag = tag;
|
||||||
|
this.latch = latch;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getTag() {
|
private String getTag() {
|
||||||
|
@ -262,6 +266,7 @@ public class TestFairCallQueue extends TestCase {
|
||||||
while (callsAdded < maxCalls || maxCalls < 0) {
|
while (callsAdded < maxCalls || maxCalls < 0) {
|
||||||
cq.put(mockCall(getTag()));
|
cq.put(mockCall(getTag()));
|
||||||
callsAdded++;
|
callsAdded++;
|
||||||
|
latch.countDown();
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
return;
|
return;
|
||||||
|
@ -280,14 +285,17 @@ public class TestFairCallQueue extends TestCase {
|
||||||
public volatile int callsTaken = 0; // total calls taken, accurate if we aren't interrupted
|
public volatile int callsTaken = 0; // total calls taken, accurate if we aren't interrupted
|
||||||
public volatile Schedulable lastResult = null; // the last thing we took
|
public volatile Schedulable lastResult = null; // the last thing we took
|
||||||
private final int maxCalls; // maximum calls to take
|
private final int maxCalls; // maximum calls to take
|
||||||
|
private final CountDownLatch latch;
|
||||||
|
|
||||||
private IdentityProvider uip;
|
private IdentityProvider uip;
|
||||||
|
|
||||||
public Taker(BlockingQueue<Schedulable> aCq, int maxCalls, String tag) {
|
public Taker(BlockingQueue<Schedulable> aCq, int maxCalls, String tag,
|
||||||
|
CountDownLatch latch) {
|
||||||
this.maxCalls = maxCalls;
|
this.maxCalls = maxCalls;
|
||||||
this.cq = aCq;
|
this.cq = aCq;
|
||||||
this.tag = tag;
|
this.tag = tag;
|
||||||
this.uip = new UserIdentityProvider();
|
this.uip = new UserIdentityProvider();
|
||||||
|
this.latch = latch;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -303,6 +311,7 @@ public class TestFairCallQueue extends TestCase {
|
||||||
cq.put(res);
|
cq.put(res);
|
||||||
} else {
|
} else {
|
||||||
callsTaken++;
|
callsTaken++;
|
||||||
|
latch.countDown();
|
||||||
lastResult = res;
|
lastResult = res;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -316,10 +325,11 @@ public class TestFairCallQueue extends TestCase {
|
||||||
public void assertCanTake(BlockingQueue<Schedulable> cq, int numberOfTakes,
|
public void assertCanTake(BlockingQueue<Schedulable> cq, int numberOfTakes,
|
||||||
int takeAttempts) throws InterruptedException {
|
int takeAttempts) throws InterruptedException {
|
||||||
|
|
||||||
Taker taker = new Taker(cq, takeAttempts, "default");
|
CountDownLatch latch = new CountDownLatch(numberOfTakes);
|
||||||
|
Taker taker = new Taker(cq, takeAttempts, "default", latch);
|
||||||
Thread t = new Thread(taker);
|
Thread t = new Thread(taker);
|
||||||
t.start();
|
t.start();
|
||||||
t.join(100);
|
latch.await();
|
||||||
|
|
||||||
assertEquals(numberOfTakes, taker.callsTaken);
|
assertEquals(numberOfTakes, taker.callsTaken);
|
||||||
t.interrupt();
|
t.interrupt();
|
||||||
|
@ -329,10 +339,11 @@ public class TestFairCallQueue extends TestCase {
|
||||||
public void assertCanPut(BlockingQueue<Schedulable> cq, int numberOfPuts,
|
public void assertCanPut(BlockingQueue<Schedulable> cq, int numberOfPuts,
|
||||||
int putAttempts) throws InterruptedException {
|
int putAttempts) throws InterruptedException {
|
||||||
|
|
||||||
Putter putter = new Putter(cq, putAttempts, null);
|
CountDownLatch latch = new CountDownLatch(numberOfPuts);
|
||||||
|
Putter putter = new Putter(cq, putAttempts, null, latch);
|
||||||
Thread t = new Thread(putter);
|
Thread t = new Thread(putter);
|
||||||
t.start();
|
t.start();
|
||||||
t.join(100);
|
latch.await();
|
||||||
|
|
||||||
assertEquals(numberOfPuts, putter.callsAdded);
|
assertEquals(numberOfPuts, putter.callsAdded);
|
||||||
t.interrupt();
|
t.interrupt();
|
||||||
|
|
Loading…
Reference in New Issue