YARN-5874. RM -format-state-store and -remove-application-from-state-store commands fail with NPE. Contributed by Varun Saxena.
(cherry picked from commit b7070f3308
)
This commit is contained in:
parent
fad6de3080
commit
528ef2407c
|
@ -1333,8 +1333,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
* @param conf
|
* @param conf
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
private static void deleteRMStateStore(Configuration conf) throws Exception {
|
@VisibleForTesting
|
||||||
|
static void deleteRMStateStore(Configuration conf) throws Exception {
|
||||||
RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
|
RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
|
||||||
|
rmStore.setResourceManager(new ResourceManager());
|
||||||
rmStore.init(conf);
|
rmStore.init(conf);
|
||||||
rmStore.start();
|
rmStore.start();
|
||||||
try {
|
try {
|
||||||
|
@ -1346,9 +1348,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void removeApplication(Configuration conf, String applicationId)
|
@VisibleForTesting
|
||||||
|
static void removeApplication(Configuration conf, String applicationId)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
|
RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
|
||||||
|
rmStore.setResourceManager(new ResourceManager());
|
||||||
rmStore.init(conf);
|
rmStore.init(conf);
|
||||||
rmStore.start();
|
rmStore.start();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -86,7 +86,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
public abstract class RMStateStore extends AbstractService {
|
public abstract class RMStateStore extends AbstractService {
|
||||||
|
|
||||||
// constants for RM App state and RMDTSecretManagerState.
|
// constants for RM App state and RMDTSecretManagerState.
|
||||||
protected static final String RM_APP_ROOT = "RMAppRoot";
|
@VisibleForTesting
|
||||||
|
public static final String RM_APP_ROOT = "RMAppRoot";
|
||||||
protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
|
protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
|
||||||
protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
|
protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
|
||||||
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
|
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
|
||||||
|
|
|
@ -127,7 +127,8 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
"RMDTSequentialNumber";
|
"RMDTSequentialNumber";
|
||||||
private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
|
private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
|
||||||
"RMDTMasterKeysRoot";
|
"RMDTMasterKeysRoot";
|
||||||
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
|
@VisibleForTesting
|
||||||
|
public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
|
||||||
protected static final Version CURRENT_VERSION_INFO =
|
protected static final Version CURRENT_VERSION_INFO =
|
||||||
Version.newInstance(1, 3);
|
Version.newInstance(1, 3);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
import org.apache.curator.test.TestingServer;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestRMStoreCommands {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFormatStateStoreCmdForZK() throws Exception {
|
||||||
|
StateChangeRequestInfo req = new StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||||
|
try (TestingServer curatorTestingServer =
|
||||||
|
TestZKRMStateStore.setupCuratorServer();
|
||||||
|
CuratorFramework curatorFramework = TestZKRMStateStore.
|
||||||
|
setupCuratorFramework(curatorTestingServer)) {
|
||||||
|
Configuration conf = TestZKRMStateStore.createHARMConf("rm1,rm2", "rm1",
|
||||||
|
1234, false, curatorTestingServer);
|
||||||
|
ResourceManager rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
rm.getRMContext().getRMAdminService().transitionToActive(req);
|
||||||
|
String zkStateRoot = ZKRMStateStore.ROOT_ZNODE_NAME;
|
||||||
|
assertEquals("RM State store parent path should have a child node " +
|
||||||
|
zkStateRoot, zkStateRoot, curatorFramework.getChildren().forPath(
|
||||||
|
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH).get(0));
|
||||||
|
rm.close();
|
||||||
|
try {
|
||||||
|
ResourceManager.deleteRMStateStore(conf);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Exception should not be thrown during format rm state store" +
|
||||||
|
" operation.");
|
||||||
|
}
|
||||||
|
assertTrue("After store format parent path should have no child nodes",
|
||||||
|
curatorFramework.getChildren().forPath(
|
||||||
|
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH).isEmpty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveApplicationFromStateStoreCmdForZK() throws Exception {
|
||||||
|
StateChangeRequestInfo req = new StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||||
|
try (TestingServer curatorTestingServer =
|
||||||
|
TestZKRMStateStore.setupCuratorServer();
|
||||||
|
CuratorFramework curatorFramework = TestZKRMStateStore.
|
||||||
|
setupCuratorFramework(curatorTestingServer)) {
|
||||||
|
Configuration conf = TestZKRMStateStore.createHARMConf("rm1,rm2", "rm1",
|
||||||
|
1234, false, curatorTestingServer);
|
||||||
|
ResourceManager rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
rm.getRMContext().getRMAdminService().transitionToActive(req);
|
||||||
|
rm.close();
|
||||||
|
String appId = ApplicationId.newInstance(
|
||||||
|
System.currentTimeMillis(), 1).toString();
|
||||||
|
String appRootPath = YarnConfiguration.
|
||||||
|
DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH + "/"+
|
||||||
|
ZKRMStateStore.ROOT_ZNODE_NAME + "/" + RMStateStore.RM_APP_ROOT;
|
||||||
|
String appIdPath = appRootPath + "/" + appId;
|
||||||
|
curatorFramework.create().forPath(appIdPath);
|
||||||
|
assertEquals("Application node for " + appId + "should exist",
|
||||||
|
appId, curatorFramework.getChildren().forPath(appRootPath).get(0));
|
||||||
|
try {
|
||||||
|
ResourceManager.removeApplication(conf, appId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Exception should not be thrown while removing app from " +
|
||||||
|
"rm state store.");
|
||||||
|
}
|
||||||
|
assertTrue("After remove app from store there should be no child nodes" +
|
||||||
|
" in app root path",
|
||||||
|
curatorFramework.getChildren().forPath(appRootPath).isEmpty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.ZooDefs.Perms;
|
import org.apache.zookeeper.ZooDefs.Perms;
|
||||||
import org.apache.zookeeper.data.ACL;
|
import org.apache.zookeeper.data.ACL;
|
||||||
|
@ -79,15 +78,26 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
private TestingServer curatorTestingServer;
|
private TestingServer curatorTestingServer;
|
||||||
private CuratorFramework curatorFramework;
|
private CuratorFramework curatorFramework;
|
||||||
|
|
||||||
@Before
|
public static TestingServer setupCuratorServer() throws Exception {
|
||||||
public void setupCuratorServer() throws Exception {
|
TestingServer curatorTestingServer = new TestingServer();
|
||||||
curatorTestingServer = new TestingServer();
|
|
||||||
curatorTestingServer.start();
|
curatorTestingServer.start();
|
||||||
curatorFramework = CuratorFrameworkFactory.builder()
|
return curatorTestingServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CuratorFramework setupCuratorFramework(
|
||||||
|
TestingServer curatorTestingServer) throws Exception {
|
||||||
|
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
|
||||||
.connectString(curatorTestingServer.getConnectString())
|
.connectString(curatorTestingServer.getConnectString())
|
||||||
.retryPolicy(new RetryNTimes(100, 100))
|
.retryPolicy(new RetryNTimes(100, 100))
|
||||||
.build();
|
.build();
|
||||||
curatorFramework.start();
|
curatorFramework.start();
|
||||||
|
return curatorFramework;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setupCurator() throws Exception {
|
||||||
|
curatorTestingServer = setupCuratorServer();
|
||||||
|
curatorFramework = setupCuratorFramework(curatorTestingServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -243,19 +253,21 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
Assert.assertEquals(defaultVersion, store.loadVersion());
|
Assert.assertEquals(defaultVersion, store.loadVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Configuration createHARMConf(
|
public static Configuration createHARMConf(String rmIds, String rmId,
|
||||||
String rmIds, String rmId, int adminPort) {
|
int adminPort, boolean autoFailoverEnabled,
|
||||||
|
TestingServer curatorTestServer) {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
|
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
|
||||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
|
||||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||||
curatorTestingServer.getConnectString());
|
curatorTestServer.getConnectString());
|
||||||
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
|
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
|
||||||
|
conf.setBoolean(
|
||||||
|
YarnConfiguration.AUTO_FAILOVER_ENABLED, autoFailoverEnabled);
|
||||||
for (String rpcAddress : YarnConfiguration.getServiceAddressConfKeys(conf)) {
|
for (String rpcAddress : YarnConfiguration.getServiceAddressConfKeys(conf)) {
|
||||||
for (String id : HAUtil.getRMHAIds(conf)) {
|
for (String id : HAUtil.getRMHAIds(conf)) {
|
||||||
conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
|
conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
|
||||||
|
@ -293,8 +305,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
ZKRMStateStore.ROOT_ZNODE_NAME;
|
ZKRMStateStore.ROOT_ZNODE_NAME;
|
||||||
|
|
||||||
// Start RM with HA enabled
|
// Start RM with HA enabled
|
||||||
Configuration conf = createHARMConf("rm1,rm2", "rm1", 1234);
|
Configuration conf =
|
||||||
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
createHARMConf("rm1,rm2", "rm1", 1234, false, curatorTestingServer);
|
||||||
ResourceManager rm = new MockRM(conf);
|
ResourceManager rm = new MockRM(conf);
|
||||||
rm.start();
|
rm.start();
|
||||||
rm.getRMContext().getRMAdminService().transitionToActive(req);
|
rm.getRMContext().getRMAdminService().transitionToActive(req);
|
||||||
|
@ -336,8 +348,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
StateChangeRequestInfo req = new StateChangeRequestInfo(
|
StateChangeRequestInfo req = new StateChangeRequestInfo(
|
||||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||||
|
|
||||||
Configuration conf1 = createHARMConf("rm1,rm2", "rm1", 1234);
|
Configuration conf1 =
|
||||||
conf1.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
createHARMConf("rm1,rm2", "rm1", 1234, false, curatorTestingServer);
|
||||||
ResourceManager rm1 = new MockRM(conf1);
|
ResourceManager rm1 = new MockRM(conf1);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
rm1.getRMContext().getRMAdminService().transitionToActive(req);
|
rm1.getRMContext().getRMAdminService().transitionToActive(req);
|
||||||
|
@ -347,8 +359,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
HAServiceProtocol.HAServiceState.ACTIVE,
|
HAServiceProtocol.HAServiceState.ACTIVE,
|
||||||
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
|
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
|
||||||
|
|
||||||
Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678);
|
Configuration conf2 =
|
||||||
conf2.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
createHARMConf("rm1,rm2", "rm2", 5678, false, curatorTestingServer);
|
||||||
ResourceManager rm2 = new MockRM(conf2);
|
ResourceManager rm2 = new MockRM(conf2);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
rm2.getRMContext().getRMAdminService().transitionToActive(req);
|
rm2.getRMContext().getRMAdminService().transitionToActive(req);
|
||||||
|
|
Loading…
Reference in New Issue