HADOOP-11145. TestFairCallQueue fails. Contributed by Akira AJISAKA.

(cherry picked from commit b9158697a4)
This commit is contained in:
cnauroth 2014-09-30 08:57:05 -07:00
parent 781833f553
commit 57b145c3ab
2 changed files with 19 additions and 6 deletions

View File

@ -560,6 +560,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-11154. Update BUILDING.txt to state that CMake 3.0 or newer is HADOOP-11154. Update BUILDING.txt to state that CMake 3.0 or newer is
required on Mac. (cnauroth) required on Mac. (cnauroth)
HADOOP-11145. TestFairCallQueue fails. (Akira AJISAKA via cnauroth)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
import junit.framework.TestCase; import junit.framework.TestCase;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -243,11 +244,14 @@ public class TestFairCallQueue extends TestCase {
public final String tag; public final String tag;
public volatile int callsAdded = 0; // How many calls we added, accurate unless interrupted public volatile int callsAdded = 0; // How many calls we added, accurate unless interrupted
private final int maxCalls; private final int maxCalls;
private final CountDownLatch latch;
public Putter(BlockingQueue<Schedulable> aCq, int maxCalls, String tag) { public Putter(BlockingQueue<Schedulable> aCq, int maxCalls, String tag,
CountDownLatch latch) {
this.maxCalls = maxCalls; this.maxCalls = maxCalls;
this.cq = aCq; this.cq = aCq;
this.tag = tag; this.tag = tag;
this.latch = latch;
} }
private String getTag() { private String getTag() {
@ -262,6 +266,7 @@ public class TestFairCallQueue extends TestCase {
while (callsAdded < maxCalls || maxCalls < 0) { while (callsAdded < maxCalls || maxCalls < 0) {
cq.put(mockCall(getTag())); cq.put(mockCall(getTag()));
callsAdded++; callsAdded++;
latch.countDown();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
@ -280,14 +285,17 @@ public class TestFairCallQueue extends TestCase {
public volatile int callsTaken = 0; // total calls taken, accurate if we aren't interrupted public volatile int callsTaken = 0; // total calls taken, accurate if we aren't interrupted
public volatile Schedulable lastResult = null; // the last thing we took public volatile Schedulable lastResult = null; // the last thing we took
private final int maxCalls; // maximum calls to take private final int maxCalls; // maximum calls to take
private final CountDownLatch latch;
private IdentityProvider uip; private IdentityProvider uip;
public Taker(BlockingQueue<Schedulable> aCq, int maxCalls, String tag) { public Taker(BlockingQueue<Schedulable> aCq, int maxCalls, String tag,
CountDownLatch latch) {
this.maxCalls = maxCalls; this.maxCalls = maxCalls;
this.cq = aCq; this.cq = aCq;
this.tag = tag; this.tag = tag;
this.uip = new UserIdentityProvider(); this.uip = new UserIdentityProvider();
this.latch = latch;
} }
@Override @Override
@ -303,6 +311,7 @@ public class TestFairCallQueue extends TestCase {
cq.put(res); cq.put(res);
} else { } else {
callsTaken++; callsTaken++;
latch.countDown();
lastResult = res; lastResult = res;
} }
} }
@ -316,10 +325,11 @@ public class TestFairCallQueue extends TestCase {
public void assertCanTake(BlockingQueue<Schedulable> cq, int numberOfTakes, public void assertCanTake(BlockingQueue<Schedulable> cq, int numberOfTakes,
int takeAttempts) throws InterruptedException { int takeAttempts) throws InterruptedException {
Taker taker = new Taker(cq, takeAttempts, "default"); CountDownLatch latch = new CountDownLatch(numberOfTakes);
Taker taker = new Taker(cq, takeAttempts, "default", latch);
Thread t = new Thread(taker); Thread t = new Thread(taker);
t.start(); t.start();
t.join(100); latch.await();
assertEquals(numberOfTakes, taker.callsTaken); assertEquals(numberOfTakes, taker.callsTaken);
t.interrupt(); t.interrupt();
@ -329,10 +339,11 @@ public class TestFairCallQueue extends TestCase {
public void assertCanPut(BlockingQueue<Schedulable> cq, int numberOfPuts, public void assertCanPut(BlockingQueue<Schedulable> cq, int numberOfPuts,
int putAttempts) throws InterruptedException { int putAttempts) throws InterruptedException {
Putter putter = new Putter(cq, putAttempts, null); CountDownLatch latch = new CountDownLatch(numberOfPuts);
Putter putter = new Putter(cq, putAttempts, null, latch);
Thread t = new Thread(putter); Thread t = new Thread(putter);
t.start(); t.start();
t.join(100); latch.await();
assertEquals(numberOfPuts, putter.callsAdded); assertEquals(numberOfPuts, putter.callsAdded);
t.interrupt(); t.interrupt();