YARN-2456. Possible livelock in CapacityScheduler when RM is recovering
apps. Contributed by Jian He
This commit is contained in:
parent
40364dc47c
commit
e65ae575a0
|
@ -345,6 +345,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2484. FileSystemRMStateStore#readFile/writeFile should close
|
YARN-2484. FileSystemRMStateStore#readFile/writeFile should close
|
||||||
FSData(In|Out)putStream in final block (Tsuyoshi OZAWA via jlowe)
|
FSData(In|Out)putStream in final block (Tsuyoshi OZAWA via jlowe)
|
||||||
|
|
||||||
|
YARN-2456. Possible livelock in CapacityScheduler when RM is recovering apps.
|
||||||
|
(Jian He via xgong)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
|
@ -421,7 +422,7 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
*/
|
*/
|
||||||
public static class RMState {
|
public static class RMState {
|
||||||
Map<ApplicationId, ApplicationState> appState =
|
Map<ApplicationId, ApplicationState> appState =
|
||||||
new HashMap<ApplicationId, ApplicationState>();
|
new TreeMap<ApplicationId, ApplicationState>();
|
||||||
|
|
||||||
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
|
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
|
||||||
|
|
||||||
|
|
|
@ -19,9 +19,11 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
|
@ -1656,6 +1658,47 @@ public class TestRMRestart {
|
||||||
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 20000)
|
||||||
|
public void testAppRecoveredInOrderOnRMRestart() throws Exception {
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
for (int i = 10; i > 0; i--) {
|
||||||
|
ApplicationState appState = mock(ApplicationState.class);
|
||||||
|
when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i));
|
||||||
|
memStore.getState().getApplicationState()
|
||||||
|
.put(appState.getAppId(), appState);
|
||||||
|
}
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected RMAppManager createRMAppManager() {
|
||||||
|
return new TestRMAppManager(this.rmContext, this.scheduler,
|
||||||
|
this.masterService, this.applicationACLsManager, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
class TestRMAppManager extends RMAppManager {
|
||||||
|
ApplicationId prevId = ApplicationId.newInstance(1234, 0);
|
||||||
|
|
||||||
|
public TestRMAppManager(RMContext context, YarnScheduler scheduler,
|
||||||
|
ApplicationMasterService masterService,
|
||||||
|
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
||||||
|
super(context, scheduler, masterService, applicationACLsManager, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void recoverApplication(ApplicationState appState,
|
||||||
|
RMState rmState) throws Exception {
|
||||||
|
// check application is recovered in order.
|
||||||
|
Assert.assertTrue(rmState.getApplicationState().size() > 0);
|
||||||
|
Assert.assertTrue(appState.getAppId().compareTo(prevId) > 0);
|
||||||
|
prevId = appState.getAppId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm1.start();
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testQueueMetricsOnRMRestart() throws Exception {
|
public void testQueueMetricsOnRMRestart() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue