YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1609280 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-07-09 20:03:18 +00:00
parent b05080053e
commit 02f72d7a6b
12 changed files with 146 additions and 41 deletions

View File

@ -6,6 +6,8 @@ Release 2.6.0 - UNRELEASED
NEW FEATURES NEW FEATURES
YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha)
IMPROVEMENTS IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu YARN-2242. Improve exception information on AM launch crashes. (Li Lu

View File

@ -61,6 +61,7 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
function print_usage(){ function print_usage(){
echo "Usage: yarn [--config confdir] COMMAND" echo "Usage: yarn [--config confdir] COMMAND"
echo "where COMMAND is one of:" echo "where COMMAND is one of:"
echo " resourcemanager -format deletes the RMStateStore"
echo " resourcemanager run the ResourceManager" echo " resourcemanager run the ResourceManager"
echo " nodemanager run a nodemanager on each slave" echo " nodemanager run a nodemanager on each slave"
echo " timelineserver run the timeline server" echo " timelineserver run the timeline server"

View File

@ -1035,12 +1035,17 @@ public static void main(String argv[]) {
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try { try {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
// If -format, then delete RMStateStore; else startup normally
if (argv.length == 1 && argv[0].equals("-format")) {
deleteRMStateStore(conf);
} else {
ResourceManager resourceManager = new ResourceManager(); ResourceManager resourceManager = new ResourceManager();
ShutdownHookManager.get().addShutdownHook( ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager), new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY); SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf); resourceManager.init(conf);
resourceManager.start(); resourceManager.start();
}
} catch (Throwable t) { } catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t); LOG.fatal("Error starting ResourceManager", t);
System.exit(-1); System.exit(-1);
@ -1077,4 +1082,23 @@ public static InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
} }
/**
* Deletes the RMStateStore
*
* @param conf
* @throws Exception
*/
private static void deleteRMStateStore(Configuration conf) throws Exception {
RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
rmStore.init(conf);
rmStore.start();
try {
LOG.info("Deleting ResourceManager state store...");
rmStore.deleteStore();
LOG.info("State store deleted");
} finally {
rmStore.stop();
}
}
} }

View File

@ -515,6 +515,13 @@ public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
deleteFile(nodeCreatePath); deleteFile(nodeCreatePath);
} }
@Override
public synchronized void deleteStore() throws IOException {
if (fs.exists(rootDirPath)) {
fs.delete(rootDirPath, true);
}
}
private Path getAppDir(Path root, String appId) { private Path getAppDir(Path root, String appId) {
return getNodePath(root, appId); return getNodePath(root, appId);
} }

View File

@ -267,4 +267,8 @@ protected RMStateVersion getCurrentVersion() {
return null; return null;
} }
@Override
public void deleteStore() throws Exception {
}
} }

View File

@ -138,4 +138,9 @@ protected RMStateVersion getCurrentVersion() {
return null; return null;
} }
@Override
public void deleteStore() throws Exception {
// Do nothing
}
} }

View File

@ -845,4 +845,10 @@ public void handle(RMStateStoreEvent event) {
handleStoreEvent(event); handleStoreEvent(event);
} }
} }
/**
* Derived classes must implement this method to delete the state store
* @throws Exception
*/
public abstract void deleteStore() throws Exception;
} }

View File

@ -805,6 +805,13 @@ protected synchronized void removeRMDTMasterKeyState(
} }
} }
@Override
public synchronized void deleteStore() throws Exception {
if (existsWithRetries(zkRootNodePath, true) != null) {
deleteWithRetries(zkRootNodePath, true);
}
}
// ZK related code // ZK related code
/** /**
* Watcher implementation which forward events to the ZKRMStateStore This * Watcher implementation which forward events to the ZKRMStateStore This
@ -959,6 +966,29 @@ Stat run() throws KeeperException, InterruptedException {
}.runWithRetries(); }.runWithRetries();
} }
private void deleteWithRetries(
final String path, final boolean watch) throws Exception {
new ZKAction<Void>() {
@Override
Void run() throws KeeperException, InterruptedException {
recursiveDeleteWithRetriesHelper(path, watch);
return null;
}
}.runWithRetries();
}
/**
* Helper method that deletes znodes recursively
*/
private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren(path, watch);
for (String child : children) {
recursiveDeleteWithRetriesHelper(path + "/" + child, false);
}
zkClient.delete(path, -1);
}
/** /**
* Helper class that periodically attempts creating a znode to ensure that * Helper class that periodically attempts creating a znode to ensure that
* this RM continues to be the Active. * this RM continues to be the Active.

View File

@ -515,26 +515,7 @@ public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
throws Exception { throws Exception {
RMStateStore store = stateStoreHelper.getRMStateStore(); RMStateStore store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(new TestDispatcher()); store.setRMDispatcher(new TestDispatcher());
// create and store apps ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
ArrayList<RMApp> appList = new ArrayList<RMApp>();
int NUM_APPS = 5;
for (int i = 0; i < NUM_APPS; i++) {
ApplicationId appId = ApplicationId.newInstance(1383183338, i);
RMApp app = storeApp(store, appId, 123456789, 987654321);
appList.add(app);
}
Assert.assertEquals(NUM_APPS, appList.size());
for (RMApp app : appList) {
// wait for app to be stored.
while (true) {
if (stateStoreHelper.appExists(app)) {
break;
} else {
Thread.sleep(100);
}
}
}
for (RMApp app : appList) { for (RMApp app : appList) {
// remove the app // remove the app
@ -550,6 +531,41 @@ public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
} }
} }
private ArrayList<RMApp> createAndStoreApps(
RMStateStoreHelper stateStoreHelper, RMStateStore store, int numApps)
throws Exception {
ArrayList<RMApp> appList = new ArrayList<RMApp>();
for (int i = 0; i < numApps; i++) {
ApplicationId appId = ApplicationId.newInstance(1383183338, i);
RMApp app = storeApp(store, appId, 123456789, 987654321);
appList.add(app);
}
Assert.assertEquals(numApps, appList.size());
for (RMApp app : appList) {
// wait for app to be stored.
while (true) {
if (stateStoreHelper.appExists(app)) {
break;
} else {
Thread.sleep(100);
}
}
}
return appList;
}
public void testDeleteStore(RMStateStoreHelper stateStoreHelper)
throws Exception {
RMStateStore store = stateStoreHelper.getRMStateStore();
ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
store.deleteStore();
// verify apps deleted
for (RMApp app : appList) {
Assert.assertFalse(stateStoreHelper.appExists(app));
}
}
protected void modifyAppState() throws Exception { protected void modifyAppState() throws Exception {
} }

View File

@ -160,6 +160,7 @@ public void testFSRMStateStore() throws Exception {
testCheckVersion(fsTester); testCheckVersion(fsTester);
testEpoch(fsTester); testEpoch(fsTester);
testAppDeletion(fsTester); testAppDeletion(fsTester);
testDeleteStore(fsTester);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }

View File

@ -122,6 +122,7 @@ public void testZKRMStateStoreRealZK() throws Exception {
testCheckVersion(zkTester); testCheckVersion(zkTester);
testEpoch(zkTester); testEpoch(zkTester);
testAppDeletion(zkTester); testAppDeletion(zkTester);
testDeleteStore(zkTester);
} }
private Configuration createHARMConf( private Configuration createHARMConf(

View File

@ -157,9 +157,17 @@ Usage: yarn [--config confdir] COMMAND
Start the ResourceManager Start the ResourceManager
------- -------
Usage: yarn resourcemanager Usage: yarn resourcemanager [-format]
------- -------
*---------------+--------------+
|| COMMAND_OPTIONS || Description |
*---------------+--------------+
| -format | Formats the RMStateStore. This will clear the RMStateStore and is
| | useful if past applications are no longer needed. This should be run
| | only when the ResourceManager is not running.
*---------------+--------------+
** nodemanager ** nodemanager
Start the NodeManager Start the NodeManager