YARN-231. RM Restart - Add FS-based persistent store implementation for RMStateStore. Contributed by Bikas Saha
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1437245 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f5551bf809
commit
29ddce96a2
|
@ -37,6 +37,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
|
|
||||||
YARN-328. Use token request messages defined in hadoop common. (suresh)
|
YARN-328. Use token request messages defined in hadoop common. (suresh)
|
||||||
|
|
||||||
|
YARN-231. RM Restart - Add FS-based persistent store implementation for
|
||||||
|
RMStateStore (Bikas Saha via hitesh)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-223. Update process tree instead of getting new process trees.
|
YARN-223. Update process tree instead of getting new process trees.
|
||||||
|
|
|
@ -38,6 +38,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
public abstract class ApplicationAttemptId implements
|
public abstract class ApplicationAttemptId implements
|
||||||
Comparable<ApplicationAttemptId> {
|
Comparable<ApplicationAttemptId> {
|
||||||
|
|
||||||
|
public static final String appAttemptIdStrPrefix = "appattempt_";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the <code>ApplicationId</code> of the <code>ApplicationAttempId</code>.
|
* Get the <code>ApplicationId</code> of the <code>ApplicationAttempId</code>.
|
||||||
* @return <code>ApplicationId</code> of the <code>ApplicationAttempId</code>
|
* @return <code>ApplicationId</code> of the <code>ApplicationAttempId</code>
|
||||||
|
@ -111,11 +113,11 @@ public abstract class ApplicationAttemptId implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder sb = new StringBuilder("appattempt_");
|
StringBuilder sb = new StringBuilder(appAttemptIdStrPrefix);
|
||||||
sb.append(this.getApplicationId().getClusterTimestamp()).append("_");
|
sb.append(this.getApplicationId().getClusterTimestamp()).append("_");
|
||||||
sb.append(ApplicationId.appIdFormat.get().format(
|
sb.append(ApplicationId.appIdFormat.get().format(
|
||||||
this.getApplicationId().getId()));
|
this.getApplicationId().getId()));
|
||||||
sb.append("_").append(attemptIdFormat.get().format(getAttemptId()));
|
sb.append("_").append(attemptIdFormat.get().format(getAttemptId()));
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
@Stable
|
@Stable
|
||||||
public abstract class ApplicationId implements Comparable<ApplicationId> {
|
public abstract class ApplicationId implements Comparable<ApplicationId> {
|
||||||
|
|
||||||
|
public static final String appIdStrPrefix = "application_";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the short integer identifier of the <code>ApplicationId</code>
|
* Get the short integer identifier of the <code>ApplicationId</code>
|
||||||
* which is unique for all applications started by a particular instance
|
* which is unique for all applications started by a particular instance
|
||||||
|
@ -88,7 +90,7 @@ public abstract class ApplicationId implements Comparable<ApplicationId> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "application_" + this.getClusterTimestamp() + "_"
|
return appIdStrPrefix + this.getClusterTimestamp() + "_"
|
||||||
+ appIdFormat.get().format(getId());
|
+ appIdFormat.get().format(getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,4 +121,4 @@ public abstract class ApplicationId implements Comparable<ApplicationId> {
|
||||||
return false;
|
return false;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -236,6 +236,10 @@ public class YarnConfiguration extends Configuration {
|
||||||
/** The class to use as the persistent store.*/
|
/** The class to use as the persistent store.*/
|
||||||
public static final String RM_STORE = RM_PREFIX + "store.class";
|
public static final String RM_STORE = RM_PREFIX + "store.class";
|
||||||
|
|
||||||
|
/** URI for FileSystemRMStateStore */
|
||||||
|
public static final String FS_RM_STATE_STORE_URI =
|
||||||
|
RM_PREFIX + "fs.rm-state-store.uri";
|
||||||
|
|
||||||
/** The maximum number of completed applications RM keeps. */
|
/** The maximum number of completed applications RM keeps. */
|
||||||
public static final String RM_MAX_COMPLETED_APPLICATIONS =
|
public static final String RM_MAX_COMPLETED_APPLICATIONS =
|
||||||
RM_PREFIX + "max-completed-applications";
|
RM_PREFIX + "max-completed-applications";
|
||||||
|
|
|
@ -230,6 +230,17 @@
|
||||||
<property>
|
<property>
|
||||||
<description>The class to use as the persistent store.</description>
|
<description>The class to use as the persistent store.</description>
|
||||||
<name>yarn.resourcemanager.store.class</name>
|
<name>yarn.resourcemanager.store.class</name>
|
||||||
|
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>URI pointing to the location of the FileSystem path where
|
||||||
|
RM state will be stored. This must be supplied when using
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
|
||||||
|
as the value for yarn.resourcemanager.store.class</description>
|
||||||
|
<name>yarn.resourcemanager.fs.rm-state-store.uri</name>
|
||||||
|
<value>${hadoop.tmp.dir}/yarn/system/rmstore</value>
|
||||||
|
<!--value>hdfs://localhost:9000/rmstore</value-->
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -41,6 +41,12 @@
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
|
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-hdfs</artifactId>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -0,0 +1,233 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto;
|
||||||
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
/**
|
||||||
|
* A simple class for storing RM state in any storage that implements a basic
|
||||||
|
* FileSystem interface. Does not use directories so that simple key-value
|
||||||
|
* stores can be used. The retry policy for the real filesystem client must be
|
||||||
|
* configured separately to enable retry of filesystem operations when needed.
|
||||||
|
*/
|
||||||
|
public class FileSystemRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
|
||||||
|
|
||||||
|
private static final String ROOT_DIR_NAME = "FSRMStateRoot";
|
||||||
|
|
||||||
|
|
||||||
|
private FileSystem fs;
|
||||||
|
|
||||||
|
private Path fsRootDirPath;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Path fsWorkingPath;
|
||||||
|
|
||||||
|
public synchronized void initInternal(Configuration conf)
|
||||||
|
throws Exception{
|
||||||
|
|
||||||
|
fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
|
||||||
|
fsRootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
|
||||||
|
|
||||||
|
// create filesystem
|
||||||
|
fs = fsWorkingPath.getFileSystem(conf);
|
||||||
|
fs.mkdirs(fsRootDirPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized void closeInternal() throws Exception {
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized RMState loadState() throws Exception {
|
||||||
|
try {
|
||||||
|
RMState state = new RMState();
|
||||||
|
FileStatus[] childNodes = fs.listStatus(fsRootDirPath);
|
||||||
|
List<ApplicationAttemptState> attempts =
|
||||||
|
new ArrayList<ApplicationAttemptState>();
|
||||||
|
for(FileStatus childNodeStatus : childNodes) {
|
||||||
|
assert childNodeStatus.isFile();
|
||||||
|
String childNodeName = childNodeStatus.getPath().getName();
|
||||||
|
Path childNodePath = getNodePath(childNodeName);
|
||||||
|
byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
|
||||||
|
if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
|
||||||
|
// application
|
||||||
|
LOG.info("Loading application from node: " + childNodeName);
|
||||||
|
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
|
||||||
|
ApplicationStateDataPBImpl appStateData =
|
||||||
|
new ApplicationStateDataPBImpl(
|
||||||
|
ApplicationStateDataProto.parseFrom(childData));
|
||||||
|
ApplicationState appState = new ApplicationState(
|
||||||
|
appStateData.getSubmitTime(),
|
||||||
|
appStateData.getApplicationSubmissionContext());
|
||||||
|
// assert child node name is same as actual applicationId
|
||||||
|
assert appId.equals(appState.context.getApplicationId());
|
||||||
|
state.appState.put(appId, appState);
|
||||||
|
} else if(childNodeName.startsWith(
|
||||||
|
ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
||||||
|
// attempt
|
||||||
|
LOG.info("Loading application attempt from node: " + childNodeName);
|
||||||
|
ApplicationAttemptId attemptId =
|
||||||
|
ConverterUtils.toApplicationAttemptId(childNodeName);
|
||||||
|
ApplicationAttemptStateDataPBImpl attemptStateData =
|
||||||
|
new ApplicationAttemptStateDataPBImpl(
|
||||||
|
ApplicationAttemptStateDataProto.parseFrom(childData));
|
||||||
|
ApplicationAttemptState attemptState = new ApplicationAttemptState(
|
||||||
|
attemptId, attemptStateData.getMasterContainer());
|
||||||
|
// assert child node name is same as application attempt id
|
||||||
|
assert attemptId.equals(attemptState.getAttemptId());
|
||||||
|
attempts.add(attemptState);
|
||||||
|
} else {
|
||||||
|
LOG.info("Unknown child node with name: " + childNodeName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// go through all attempts and add them to their apps
|
||||||
|
for(ApplicationAttemptState attemptState : attempts) {
|
||||||
|
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
|
||||||
|
ApplicationState appState = state.appState.get(appId);
|
||||||
|
if(appState != null) {
|
||||||
|
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
||||||
|
} else {
|
||||||
|
// the application node may have been removed when the application
|
||||||
|
// completed but the RM might have stopped before it could remove the
|
||||||
|
// application attempt nodes
|
||||||
|
LOG.info("Application node not found for attempt: "
|
||||||
|
+ attemptState.getAttemptId());
|
||||||
|
deleteFile(getNodePath(attemptState.getAttemptId().toString()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return state;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to load state.", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void storeApplicationState(String appId,
|
||||||
|
ApplicationStateDataPBImpl appStateDataPB)
|
||||||
|
throws Exception {
|
||||||
|
Path nodeCreatePath = getNodePath(appId);
|
||||||
|
|
||||||
|
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
|
||||||
|
byte[] appStateData = appStateDataPB.getProto().toByteArray();
|
||||||
|
try {
|
||||||
|
// currently throw all exceptions. May need to respond differently for HA
|
||||||
|
// based on whether we have lost the right to write to FS
|
||||||
|
writeFile(nodeCreatePath, appStateData);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Error storing info for app: " + appId, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void storeApplicationAttemptState(String attemptId,
|
||||||
|
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
|
||||||
|
throws Exception {
|
||||||
|
Path nodeCreatePath = getNodePath(attemptId);
|
||||||
|
LOG.info("Storing info for attempt: " + attemptId
|
||||||
|
+ " at: " + nodeCreatePath);
|
||||||
|
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
|
||||||
|
try {
|
||||||
|
// currently throw all exceptions. May need to respond differently for HA
|
||||||
|
// based on whether we have lost the right to write to FS
|
||||||
|
writeFile(nodeCreatePath, attemptStateData);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Error storing info for attempt: " + attemptId, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void removeApplicationState(ApplicationState appState)
|
||||||
|
throws Exception {
|
||||||
|
String appId = appState.getAppId().toString();
|
||||||
|
Path nodeRemovePath = getNodePath(appId);
|
||||||
|
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
|
||||||
|
deleteFile(nodeRemovePath);
|
||||||
|
for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
|
||||||
|
removeApplicationAttemptState(attemptId.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void removeApplicationAttemptState(String attemptId)
|
||||||
|
throws Exception {
|
||||||
|
Path nodeRemovePath = getNodePath(attemptId);
|
||||||
|
LOG.info("Removing info for attempt: " + attemptId
|
||||||
|
+ " at: " + nodeRemovePath);
|
||||||
|
deleteFile(nodeRemovePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
// FileSystem related code
|
||||||
|
|
||||||
|
private void deleteFile(Path deletePath) throws Exception {
|
||||||
|
if(!fs.delete(deletePath, true)) {
|
||||||
|
throw new Exception("Failed to delete " + deletePath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] readFile(Path inputPath, long len) throws Exception {
|
||||||
|
FSDataInputStream fsIn = fs.open(inputPath);
|
||||||
|
// state data will not be that "long"
|
||||||
|
byte[] data = new byte[(int)len];
|
||||||
|
fsIn.readFully(data);
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeFile(Path outputPath, byte[] data) throws Exception {
|
||||||
|
FSDataOutputStream fsOut = fs.create(outputPath, false);
|
||||||
|
fsOut.write(data);
|
||||||
|
fsOut.flush();
|
||||||
|
fsOut.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Path getNodePath(String nodeName) {
|
||||||
|
return new Path(fsRootDirPath, nodeName);
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,10 +18,12 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
|
|
||||||
|
@Unstable
|
||||||
public class NullRMStateStore extends RMStateStore {
|
public class NullRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -36,7 +38,7 @@ public class NullRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RMState loadState() throws Exception {
|
public RMState loadState() throws Exception {
|
||||||
return null;
|
throw new UnsupportedOperationException("Cannot load state from null store");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,284 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
|
||||||
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
|
public class TestRMStateStore {
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(TestRMStateStore.class);
|
||||||
|
|
||||||
|
class TestDispatcher implements Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
|
||||||
|
|
||||||
|
ApplicationAttemptId attemptId;
|
||||||
|
Exception storedException;
|
||||||
|
|
||||||
|
boolean notified = false;
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
@Override
|
||||||
|
public void register(Class<? extends Enum> eventType, EventHandler handler) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(RMAppAttemptStoredEvent event) {
|
||||||
|
assertEquals(attemptId, event.getApplicationAttemptId());
|
||||||
|
assertEquals(storedException, event.getStoredException());
|
||||||
|
notified = true;
|
||||||
|
synchronized (this) {
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
@Override
|
||||||
|
public EventHandler getEventHandler() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
interface RMStateStoreHelper {
|
||||||
|
RMStateStore getRMStateStore() throws Exception;
|
||||||
|
void addOrphanAttemptIfNeeded(RMStateStore testStore,
|
||||||
|
TestDispatcher dispatcher) throws Exception;
|
||||||
|
boolean isFinalStateValid() throws Exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFSRMStateStore() throws Exception {
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
try {
|
||||||
|
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
|
||||||
|
testRMStateStore(fsTester);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class TestFSRMStateStoreTester implements RMStateStoreHelper {
|
||||||
|
Path workingDirPathURI;
|
||||||
|
FileSystemRMStateStore store;
|
||||||
|
MiniDFSCluster cluster;
|
||||||
|
|
||||||
|
class TestFileSystemRMStore extends FileSystemRMStateStore {
|
||||||
|
TestFileSystemRMStore(Configuration conf) throws Exception {
|
||||||
|
init(conf);
|
||||||
|
assertTrue(workingDirPathURI.equals(fsWorkingPath));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
|
||||||
|
Path workingDirPath = new Path("/Test");
|
||||||
|
this.cluster = cluster;
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
fs.mkdirs(workingDirPath);
|
||||||
|
Path clusterURI = new Path(cluster.getURI());
|
||||||
|
workingDirPathURI = new Path(clusterURI, workingDirPath);
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMStateStore getRMStateStore() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, workingDirPathURI.toString());
|
||||||
|
this.store = new TestFileSystemRMStore(conf);
|
||||||
|
return store;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addOrphanAttemptIfNeeded(RMStateStore testStore,
|
||||||
|
TestDispatcher dispatcher) throws Exception {
|
||||||
|
ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(
|
||||||
|
"appattempt_1352994193343_0003_000001");
|
||||||
|
storeAttempt(testStore, attemptId,
|
||||||
|
"container_1352994193343_0003_01_000001", dispatcher);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFinalStateValid() throws Exception {
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
FileStatus[] files = fs.listStatus(workingDirPathURI);
|
||||||
|
if(files.length == 1) {
|
||||||
|
// only store root directory should exist
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void waitNotify(TestDispatcher dispatcher) {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
while(!dispatcher.notified) {
|
||||||
|
synchronized (dispatcher) {
|
||||||
|
try {
|
||||||
|
dispatcher.wait(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(System.currentTimeMillis() - startTime > 1000*60) {
|
||||||
|
fail("Timed out attempt store notification");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dispatcher.notified = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void storeApp(RMStateStore store, ApplicationId appId, long time)
|
||||||
|
throws Exception {
|
||||||
|
ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl();
|
||||||
|
context.setApplicationId(appId);
|
||||||
|
|
||||||
|
RMApp mockApp = mock(RMApp.class);
|
||||||
|
when(mockApp.getApplicationId()).thenReturn(appId);
|
||||||
|
when(mockApp.getSubmitTime()).thenReturn(time);
|
||||||
|
when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
|
||||||
|
store.storeApplication(mockApp);
|
||||||
|
}
|
||||||
|
|
||||||
|
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
|
||||||
|
String containerIdStr, TestDispatcher dispatcher)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
Container container = new ContainerPBImpl();
|
||||||
|
container.setId(ConverterUtils.toContainerId(containerIdStr));
|
||||||
|
RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
|
||||||
|
when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
|
||||||
|
when(mockAttempt.getMasterContainer()).thenReturn(container);
|
||||||
|
dispatcher.attemptId = attemptId;
|
||||||
|
dispatcher.storedException = null;
|
||||||
|
store.storeApplicationAttempt(mockAttempt);
|
||||||
|
waitNotify(dispatcher);
|
||||||
|
return container.getId();
|
||||||
|
}
|
||||||
|
|
||||||
|
void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
|
||||||
|
long submitTime = System.currentTimeMillis();
|
||||||
|
RMStateStore store = stateStoreHelper.getRMStateStore();
|
||||||
|
TestDispatcher dispatcher = new TestDispatcher();
|
||||||
|
store.setDispatcher(dispatcher);
|
||||||
|
|
||||||
|
ApplicationAttemptId attemptId1 = ConverterUtils
|
||||||
|
.toApplicationAttemptId("appattempt_1352994193343_0001_000001");
|
||||||
|
ApplicationId appId1 = attemptId1.getApplicationId();
|
||||||
|
storeApp(store, appId1, submitTime);
|
||||||
|
ContainerId containerId1 = storeAttempt(store, attemptId1,
|
||||||
|
"container_1352994193343_0001_01_000001", dispatcher);
|
||||||
|
String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
|
||||||
|
ApplicationAttemptId attemptId2 =
|
||||||
|
ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
|
||||||
|
ContainerId containerId2 = storeAttempt(store, attemptId2,
|
||||||
|
"container_1352994193343_0001_02_000001", dispatcher);
|
||||||
|
|
||||||
|
ApplicationAttemptId attemptIdRemoved = ConverterUtils
|
||||||
|
.toApplicationAttemptId("appattempt_1352994193343_0002_000001");
|
||||||
|
ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
|
||||||
|
storeApp(store, appIdRemoved, submitTime);
|
||||||
|
storeAttempt(store, attemptIdRemoved,
|
||||||
|
"container_1352994193343_0002_01_000001", dispatcher);
|
||||||
|
|
||||||
|
RMApp mockRemovedApp = mock(RMApp.class);
|
||||||
|
HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
|
||||||
|
new HashMap<ApplicationAttemptId, RMAppAttempt>();
|
||||||
|
ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl();
|
||||||
|
context.setApplicationId(appIdRemoved);
|
||||||
|
when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
|
||||||
|
when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
|
||||||
|
when(mockRemovedApp.getAppAttempts()).thenReturn(attempts);
|
||||||
|
RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class);
|
||||||
|
when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved);
|
||||||
|
attempts.put(attemptIdRemoved, mockRemovedAttempt);
|
||||||
|
store.removeApplication(mockRemovedApp);
|
||||||
|
|
||||||
|
// add orphan attempt file to simulate incomplete removal of app state
|
||||||
|
stateStoreHelper.addOrphanAttemptIfNeeded(store, dispatcher);
|
||||||
|
|
||||||
|
// let things settle down
|
||||||
|
Thread.sleep(1000);
|
||||||
|
store.close();
|
||||||
|
|
||||||
|
// load state
|
||||||
|
store = stateStoreHelper.getRMStateStore();
|
||||||
|
RMState state = store.loadState();
|
||||||
|
Map<ApplicationId, ApplicationState> rmAppState = state.getApplicationState();
|
||||||
|
|
||||||
|
// removed app or orphan attempt is not loaded
|
||||||
|
assertEquals(1, rmAppState.size());
|
||||||
|
|
||||||
|
ApplicationState appState = rmAppState.get(appId1);
|
||||||
|
// app is loaded
|
||||||
|
assertNotNull(appState);
|
||||||
|
// app is loaded correctly
|
||||||
|
assertEquals(submitTime, appState.getSubmitTime());
|
||||||
|
// submission context is loaded correctly
|
||||||
|
assertEquals(appId1,
|
||||||
|
appState.getApplicationSubmissionContext().getApplicationId());
|
||||||
|
ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
|
||||||
|
// attempt1 is loaded correctly
|
||||||
|
assertNotNull(attemptState);
|
||||||
|
assertEquals(attemptId1, attemptState.getAttemptId());
|
||||||
|
// attempt1 container is loaded correctly
|
||||||
|
assertEquals(containerId1, attemptState.getMasterContainer().getId());
|
||||||
|
attemptState = appState.getAttempt(attemptId2);
|
||||||
|
// attempt2 is loaded correctly
|
||||||
|
assertNotNull(attemptState);
|
||||||
|
assertEquals(attemptId2, attemptState.getAttemptId());
|
||||||
|
// attempt2 container is loaded correctly
|
||||||
|
assertEquals(containerId2, attemptState.getMasterContainer().getId());
|
||||||
|
|
||||||
|
// assert store is in expected state after everything is cleaned
|
||||||
|
assertTrue(stateStoreHelper.isFinalStateValid());
|
||||||
|
|
||||||
|
store.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue