YARN-2456. Possible livelock in CapacityScheduler when RM is recovering

apps. Contributed by Jian He
This commit is contained in:
XuanGong 2014-09-12 15:24:02 -07:00
parent 5afc3f1dad
commit add5ac6a73
3 changed files with 48 additions and 1 deletions

View File

@ -317,6 +317,9 @@ Release 2.6.0 - UNRELEASED
YARN-2484. FileSystemRMStateStore#readFile/writeFile should close
FSData(In|Out)putStream in final block (Tsuyoshi OZAWA via jlowe)
YARN-2456. Possible livelock in CapacityScheduler when RM is recovering apps.
(Jian He via xgong)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import javax.crypto.SecretKey;
@ -421,7 +422,7 @@ public abstract class RMStateStore extends AbstractService {
*/
public static class RMState {
Map<ApplicationId, ApplicationState> appState =
new HashMap<ApplicationId, ApplicationState>();
new TreeMap<ApplicationId, ApplicationState>();
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
@ -1656,6 +1658,47 @@ public class TestRMRestart {
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
}
@Test (timeout = 20000)
public void testAppRecoveredInOrderOnRMRestart() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
for (int i = 10; i > 0; i--) {
ApplicationState appState = mock(ApplicationState.class);
when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i));
memStore.getState().getApplicationState()
.put(appState.getAppId(), appState);
}
MockRM rm1 = new MockRM(conf, memStore) {
@Override
protected RMAppManager createRMAppManager() {
return new TestRMAppManager(this.rmContext, this.scheduler,
this.masterService, this.applicationACLsManager, conf);
}
class TestRMAppManager extends RMAppManager {
ApplicationId prevId = ApplicationId.newInstance(1234, 0);
public TestRMAppManager(RMContext context, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
super(context, scheduler, masterService, applicationACLsManager, conf);
}
@Override
protected void recoverApplication(ApplicationState appState,
RMState rmState) throws Exception {
// check application is recovered in order.
Assert.assertTrue(rmState.getApplicationState().size() > 0);
Assert.assertTrue(appState.getAppId().compareTo(prevId) > 0);
prevId = appState.getAppId();
}
}
};
rm1.start();
}
@SuppressWarnings("resource")
@Test (timeout = 60000)
public void testQueueMetricsOnRMRestart() throws Exception {