YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1609278 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-07-09 19:58:43 +00:00
parent d751a61e5a
commit 8fbca62a90
12 changed files with 146 additions and 41 deletions

View File

@ -24,6 +24,8 @@ Release 2.6.0 - UNRELEASED
NEW FEATURES
YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha)
IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu

View File

@ -61,6 +61,7 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
function print_usage(){
echo "Usage: yarn [--config confdir] COMMAND"
echo "where COMMAND is one of:"
echo " resourcemanager -format deletes the RMStateStore"
echo " resourcemanager run the ResourceManager"
echo " nodemanager run a nodemanager on each slave"
echo " timelineserver run the timeline server"

View File

@ -1035,12 +1035,17 @@ public class ResourceManager extends CompositeService implements Recoverable {
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
// If -format, then delete RMStateStore; else startup normally
if (argv.length == 1 && argv[0].equals("-format")) {
deleteRMStateStore(conf);
} else {
ResourceManager resourceManager = new ResourceManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf);
resourceManager.start();
}
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);
System.exit(-1);
@ -1077,4 +1082,23 @@ public class ResourceManager extends CompositeService implements Recoverable {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
}
/**
* Deletes the RMStateStore
*
* @param conf
* @throws Exception
*/
private static void deleteRMStateStore(Configuration conf) throws Exception {
RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
rmStore.init(conf);
rmStore.start();
try {
LOG.info("Deleting ResourceManager state store...");
rmStore.deleteStore();
LOG.info("State store deleted");
} finally {
rmStore.stop();
}
}
}

View File

@ -515,6 +515,13 @@ public class FileSystemRMStateStore extends RMStateStore {
deleteFile(nodeCreatePath);
}
@Override
public synchronized void deleteStore() throws IOException {
if (fs.exists(rootDirPath)) {
fs.delete(rootDirPath, true);
}
}
private Path getAppDir(Path root, String appId) {
return getNodePath(root, appId);
}

View File

@ -267,4 +267,8 @@ public class MemoryRMStateStore extends RMStateStore {
return null;
}
@Override
public void deleteStore() throws Exception {
}
}

View File

@ -138,4 +138,9 @@ public class NullRMStateStore extends RMStateStore {
return null;
}
@Override
public void deleteStore() throws Exception {
// Do nothing
}
}

View File

@ -845,4 +845,10 @@ public abstract class RMStateStore extends AbstractService {
handleStoreEvent(event);
}
}
/**
* Derived classes must implement this method to delete the state store
* @throws Exception
*/
public abstract void deleteStore() throws Exception;
}

View File

@ -805,6 +805,13 @@ public class ZKRMStateStore extends RMStateStore {
}
}
@Override
public synchronized void deleteStore() throws Exception {
if (existsWithRetries(zkRootNodePath, true) != null) {
deleteWithRetries(zkRootNodePath, true);
}
}
// ZK related code
/**
* Watcher implementation which forward events to the ZKRMStateStore This
@ -959,6 +966,29 @@ public class ZKRMStateStore extends RMStateStore {
}.runWithRetries();
}
private void deleteWithRetries(
final String path, final boolean watch) throws Exception {
new ZKAction<Void>() {
@Override
Void run() throws KeeperException, InterruptedException {
recursiveDeleteWithRetriesHelper(path, watch);
return null;
}
}.runWithRetries();
}
/**
* Helper method that deletes znodes recursively
*/
private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren(path, watch);
for (String child : children) {
recursiveDeleteWithRetriesHelper(path + "/" + child, false);
}
zkClient.delete(path, -1);
}
/**
* Helper class that periodically attempts creating a znode to ensure that
* this RM continues to be the Active.

View File

@ -515,26 +515,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
throws Exception {
RMStateStore store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(new TestDispatcher());
// create and store apps
ArrayList<RMApp> appList = new ArrayList<RMApp>();
int NUM_APPS = 5;
for (int i = 0; i < NUM_APPS; i++) {
ApplicationId appId = ApplicationId.newInstance(1383183338, i);
RMApp app = storeApp(store, appId, 123456789, 987654321);
appList.add(app);
}
Assert.assertEquals(NUM_APPS, appList.size());
for (RMApp app : appList) {
// wait for app to be stored.
while (true) {
if (stateStoreHelper.appExists(app)) {
break;
} else {
Thread.sleep(100);
}
}
}
ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
for (RMApp app : appList) {
// remove the app
@ -550,6 +531,41 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
}
}
private ArrayList<RMApp> createAndStoreApps(
RMStateStoreHelper stateStoreHelper, RMStateStore store, int numApps)
throws Exception {
ArrayList<RMApp> appList = new ArrayList<RMApp>();
for (int i = 0; i < numApps; i++) {
ApplicationId appId = ApplicationId.newInstance(1383183338, i);
RMApp app = storeApp(store, appId, 123456789, 987654321);
appList.add(app);
}
Assert.assertEquals(numApps, appList.size());
for (RMApp app : appList) {
// wait for app to be stored.
while (true) {
if (stateStoreHelper.appExists(app)) {
break;
} else {
Thread.sleep(100);
}
}
}
return appList;
}
public void testDeleteStore(RMStateStoreHelper stateStoreHelper)
throws Exception {
RMStateStore store = stateStoreHelper.getRMStateStore();
ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5);
store.deleteStore();
// verify apps deleted
for (RMApp app : appList) {
Assert.assertFalse(stateStoreHelper.appExists(app));
}
}
protected void modifyAppState() throws Exception {
}

View File

@ -160,6 +160,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
testCheckVersion(fsTester);
testEpoch(fsTester);
testAppDeletion(fsTester);
testDeleteStore(fsTester);
} finally {
cluster.shutdown();
}

View File

@ -122,6 +122,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
testCheckVersion(zkTester);
testEpoch(zkTester);
testAppDeletion(zkTester);
testDeleteStore(zkTester);
}
private Configuration createHARMConf(

View File

@ -157,9 +157,17 @@ Usage: yarn [--config confdir] COMMAND
Start the ResourceManager
-------
Usage: yarn resourcemanager
Usage: yarn resourcemanager [-format]
-------
*---------------+--------------+
|| COMMAND_OPTIONS || Description |
*---------------+--------------+
| -format | Formats the RMStateStore. This will clear the RMStateStore and is
| | useful if past applications are no longer needed. This should be run
| | only when the ResourceManager is not running.
*---------------+--------------+
** nodemanager
Start the NodeManager