YARN-1121. Changed ResourceManager's state-store to drain all events on shut-down. Contributed by Jian He.

svn merge --ignore-ancestry -c 1540232 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1540233 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-11-09 00:56:26 +00:00
parent 76a7119930
commit ec91e5da1b
5 changed files with 119 additions and 10 deletions

View File

@ -70,6 +70,9 @@ Release 2.3.0 - UNRELEASED
YARN-1323. Set HTTPS webapp address along with other RPC addresses in HAUtil
(Karthik Kambatla via Sandy Ryza)
YARN-1121. Changed ResourceManager's state-store to drain all events on
shut-down. (Jian He via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -49,6 +49,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
private final BlockingQueue<Event> eventQueue;
private volatile boolean stopped = false;
// Configuration flag for enabling/disabling draining dispatcher's events on
// stop functionality.
private volatile boolean drainEventsOnStop = false;
// Indicates all the remaining dispatcher's events on stop have been drained
// and processed.
private volatile boolean drained = true;
// For drainEventsOnStop enabled only, block newly coming events into the
// queue while stopping.
private volatile boolean blockNewEvents = false;
private EventHandler handlerInstance = null;
private Thread eventHandlingThread;
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
private boolean exitOnDispatchException;
@ -68,6 +81,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
Event event;
try {
event = eventQueue.take();
@ -102,8 +116,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
eventHandlingThread.start();
}
public void setDrainEventsOnStop() {
drainEventsOnStop = true;
}
@Override
protected void serviceStop() throws Exception {
if (drainEventsOnStop) {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
while(!drained) {
Thread.yield();
}
}
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
@ -173,11 +198,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
@Override
public EventHandler getEventHandler() {
return new GenericEventHandler();
if (handlerInstance == null) {
handlerInstance = new GenericEventHandler();
}
return handlerInstance;
}
class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) {
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
if (qSize !=0 && qSize %1000 == 0) {

View File

@ -262,16 +262,19 @@ public abstract class RMStateStore extends AbstractService {
AsyncDispatcher dispatcher;
public synchronized void serviceInit(Configuration conf) throws Exception{
@Override
protected void serviceInit(Configuration conf) throws Exception{
// create async handler
dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.register(RMStateStoreEventType.class,
new ForwardingEventHandler());
dispatcher.setDrainEventsOnStop();
initInternal(conf);
}
protected synchronized void serviceStart() throws Exception {
@Override
protected void serviceStart() throws Exception {
dispatcher.start();
startInternal();
}
@ -288,7 +291,8 @@ public abstract class RMStateStore extends AbstractService {
*/
protected abstract void startInternal() throws Exception;
public synchronized void serviceStop() throws Exception {
@Override
protected void serviceStop() throws Exception {
closeInternal();
dispatcher.stop();
}
@ -509,8 +513,7 @@ public abstract class RMStateStore extends AbstractService {
}
// Dispatcher related code
private synchronized void handleStoreEvent(RMStateStoreEvent event) {
protected void handleStoreEvent(RMStateStoreEvent event) {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)
|| event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
ApplicationState appState = null;

View File

@ -163,6 +163,14 @@ public class MockRM extends ResourceManager {
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, true);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted) throws Exception {
ApplicationClientProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@ -222,7 +230,9 @@ public class MockRM extends ResourceManager {
}.setClientReq(client, req);
fakeUser.doAs(action);
// make sure app is immediately available after submit
waitForState(appId, RMAppState.ACCEPTED);
if (waitForAccepted) {
waitForState(appId, RMAppState.ACCEPTED);
}
return getRMContext().getRMApps().get(appId);
}

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -1062,6 +1063,65 @@ public class TestRMRestart {
rm2.stop();
}
@Test
public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore() {
volatile boolean wait = true;
@Override
public void serviceStop() throws Exception {
// Unblock app saving request.
wait = false;
super.serviceStop();
}
@Override
protected void handleStoreEvent(RMStateStoreEvent event) {
// Block app saving request.
while (wait);
super.handleStoreEvent(event);
}
};
memStore.init(conf);
// start RM
final MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
// create apps.
final ArrayList<RMApp> appList = new ArrayList<RMApp>();
final int NUM_APPS = 5;
for (int i = 0; i < NUM_APPS; i++) {
RMApp app = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false,
"default", -1, null, "MAPREDUCE", false);
appList.add(app);
rm1.waitForState(app.getApplicationId(), RMAppState.NEW_SAVING);
}
// all apps's saving request are now enqueued to RMStateStore's dispatcher
// queue, and will be processed once rm.stop() is called.
// Nothing exist in state store before stop is called.
Map<ApplicationId, ApplicationState> rmAppState =
memStore.getState().getApplicationState();
Assert.assertTrue(rmAppState.size() == 0);
// stop rm
rm1.stop();
// Assert app info is still saved even if stop is called with pending saving
// request on dispatcher.
for (RMApp app : appList) {
ApplicationState appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
.getApplicationId(), app.getApplicationSubmissionContext()
.getApplicationId());
}
Assert.assertTrue(rmAppState.size() == NUM_APPS);
}
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {