YARN-5999. AMRMClientAsync will stop if any exceptions thrown on allocate call. Contributed by Jian He

This commit is contained in:
Xuan 2016-12-14 15:05:42 -08:00
parent 85083567be
commit 236dbe3485
2 changed files with 14 additions and 19 deletions

View File

@ -60,14 +60,12 @@ extends AMRMClientAsync<T> {
private final HeartbeatThread heartbeatThread;
private final CallbackHandlerThread handlerThread;
private final BlockingQueue<AllocateResponse> responseQueue;
private final BlockingQueue<Object> responseQueue;
private final Object unregisterHeartbeatLock = new Object();
private volatile boolean keepRunning;
private volatile float progress;
private volatile Throwable savedException;
/**
*
@ -87,7 +85,6 @@ extends AMRMClientAsync<T> {
handlerThread = new CallbackHandlerThread();
responseQueue = new LinkedBlockingQueue<>();
keepRunning = true;
savedException = null;
}
/**
@ -108,9 +105,8 @@ extends AMRMClientAsync<T> {
super(client, intervalMs, callbackHandler);
heartbeatThread = new HeartbeatThread();
handlerThread = new CallbackHandlerThread();
responseQueue = new LinkedBlockingQueue<AllocateResponse>();
responseQueue = new LinkedBlockingQueue<Object>();
keepRunning = true;
savedException = null;
}
@Override
@ -262,7 +258,7 @@ extends AMRMClientAsync<T> {
public void run() {
while (true) {
AllocateResponse response = null;
Object response = null;
// synchronization ensures we don't send heartbeats after unregistering
synchronized (unregisterHeartbeatLock) {
if (!keepRunning) {
@ -277,10 +273,7 @@ extends AMRMClientAsync<T> {
return;
} catch (Throwable ex) {
LOG.error("Exception on heartbeat", ex);
savedException = ex;
// interrupt handler thread in case it waiting on the queue
handlerThread.interrupt();
return;
response = ex;
}
if (response != null) {
while (true) {
@ -313,18 +306,20 @@ extends AMRMClientAsync<T> {
return;
}
try {
AllocateResponse response;
if(savedException != null) {
LOG.error("Stopping callback due to: ", savedException);
handler.onError(savedException);
return;
}
Object object;
try {
response = responseQueue.take();
object = responseQueue.take();
} catch (InterruptedException ex) {
LOG.info("Interrupted while waiting for queue", ex);
continue;
}
if (object instanceof Throwable) {
progress = handler.getProgress();
handler.onError((Throwable) object);
continue;
}
AllocateResponse response = (AllocateResponse) object;
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {
handler.onNodesUpdated(updatedNodes);

View File

@ -213,7 +213,7 @@ public class TestAMRMClientAsync {
asyncClient.stop();
// stopping should have joined all threads and completed all callbacks
Assert.assertTrue(callbackHandler.callbackCount == 0);
Assert.assertTrue(callbackHandler.callbackCount > 0);
}
@Test (timeout = 10000)