HADOOP-12189. Improve CallQueueManager#swapQueue to make queue elements drop nearly impossible. Contributed by Zhihai Xu.

(cherry picked from commit 6736a1ab70)
This commit is contained in:
Andrew Wang 2015-07-23 14:42:35 -07:00
parent a37285620d
commit cf258b6d98
3 changed files with 24 additions and 12 deletions

View File

@ -214,6 +214,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12161. Add getStoragePolicy API to the FileSystem interface. HADOOP-12161. Add getStoragePolicy API to the FileSystem interface.
(Brahma Reddy Battula via Arpit Agarwal) (Brahma Reddy Battula via Arpit Agarwal)
HADOOP-12189. Improve CallQueueManager#swapQueue to make queue elements
drop nearly impossible. (Zhihai Xu via wang)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -32,11 +32,15 @@ import org.apache.hadoop.conf.Configuration;
*/ */
public class CallQueueManager<E> { public class CallQueueManager<E> {
public static final Log LOG = LogFactory.getLog(CallQueueManager.class); public static final Log LOG = LogFactory.getLog(CallQueueManager.class);
// Number of checkpoints for empty queue.
private static final int CHECKPOINT_NUM = 20;
// Interval to check empty queue.
private static final long CHECKPOINT_INTERVAL_MS = 10;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static <E> Class<? extends BlockingQueue<E>> convertQueueClass( static <E> Class<? extends BlockingQueue<E>> convertQueueClass(
Class<?> queneClass, Class<E> elementClass) { Class<?> queueClass, Class<E> elementClass) {
return (Class<? extends BlockingQueue<E>>)queneClass; return (Class<? extends BlockingQueue<E>>)queueClass;
} }
private final boolean clientBackOffEnabled; private final boolean clientBackOffEnabled;
@ -159,18 +163,23 @@ public class CallQueueManager<E> {
} }
/** /**
* Checks if queue is empty by checking at two points in time. * Checks if queue is empty by checking at CHECKPOINT_NUM points with
* CHECKPOINT_INTERVAL_MS interval.
* This doesn't mean the queue might not fill up at some point later, but * This doesn't mean the queue might not fill up at some point later, but
* it should decrease the probability that we lose a call this way. * it should decrease the probability that we lose a call this way.
*/ */
private boolean queueIsReallyEmpty(BlockingQueue<?> q) { private boolean queueIsReallyEmpty(BlockingQueue<?> q) {
boolean wasEmpty = q.isEmpty(); for (int i = 0; i < CHECKPOINT_NUM; i++) {
try { try {
Thread.sleep(10); Thread.sleep(CHECKPOINT_INTERVAL_MS);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
return false; return false;
} }
return q.isEmpty() && wasEmpty; if (!q.isEmpty()) {
return false;
}
}
return true;
} }
private String stringRepr(Object o) { private String stringRepr(Object o) {

View File

@ -165,7 +165,7 @@ public class TestCallQueueManager {
HashMap<Runnable, Thread> threads = new HashMap<Runnable, Thread>(); HashMap<Runnable, Thread> threads = new HashMap<Runnable, Thread>();
// Create putters and takers // Create putters and takers
for (int i=0; i < 50; i++) { for (int i=0; i < 1000; i++) {
Putter p = new Putter(manager, -1, -1); Putter p = new Putter(manager, -1, -1);
Thread pt = new Thread(p); Thread pt = new Thread(p);
producers.add(p); producers.add(p);
@ -174,7 +174,7 @@ public class TestCallQueueManager {
pt.start(); pt.start();
} }
for (int i=0; i < 20; i++) { for (int i=0; i < 100; i++) {
Taker t = new Taker(manager, -1, -1); Taker t = new Taker(manager, -1, -1);
Thread tt = new Thread(t); Thread tt = new Thread(t);
consumers.add(t); consumers.add(t);
@ -183,7 +183,7 @@ public class TestCallQueueManager {
tt.start(); tt.start();
} }
Thread.sleep(10); Thread.sleep(500);
for (int i=0; i < 5; i++) { for (int i=0; i < 5; i++) {
manager.swapQueue(queueClass, 5000, "", null); manager.swapQueue(queueClass, 5000, "", null);