Merge r1555021 through r1566358 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1567874 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-02-13 09:53:52 +00:00
commit a4f152db96
37 changed files with 1805 additions and 119 deletions

View File

@ -348,6 +348,11 @@ Release 2.4.0 - UNRELEASED
HDFS-5807. TestBalancerWithNodeGroup.testBalancerWithNodeGroup fails
intermittently. (Chen He via kihwal)
HDFS-5882. TestAuditLogs is flaky (jxiang via cmccabe)
HDFS-5900. Cannot set cache pool limit of "unlimited" via CacheAdmin.
(wang)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -140,6 +140,18 @@ public class CacheAdmin extends Configured implements Tool {
return maxTtl;
}
private static Long parseLimitString(String limitString) {
Long limit = null;
if (limitString != null) {
if (limitString.equalsIgnoreCase("unlimited")) {
limit = CachePoolInfo.LIMIT_UNLIMITED;
} else {
limit = Long.parseLong(limitString);
}
}
return limit;
}
private static Expiration parseExpirationString(String ttlString)
throws IOException {
Expiration ex = null;
@ -650,8 +662,8 @@ public class CacheAdmin extends Configured implements Tool {
info.setMode(new FsPermission(mode));
}
String limitString = StringUtils.popOptionWithArgument("-limit", args);
if (limitString != null) {
long limit = Long.parseLong(limitString);
Long limit = parseLimitString(limitString);
if (limit != null) {
info.setLimit(limit);
}
String maxTtlString = StringUtils.popOptionWithArgument("-maxTtl", args);
@ -726,8 +738,7 @@ public class CacheAdmin extends Configured implements Tool {
Integer mode = (modeString == null) ?
null : Integer.parseInt(modeString, 8);
String limitString = StringUtils.popOptionWithArgument("-limit", args);
Long limit = (limitString == null) ?
null : Long.parseLong(limitString);
Long limit = parseLimitString(limitString);
String maxTtlString = StringUtils.popOptionWithArgument("-maxTtl", args);
Long maxTtl = null;
try {

View File

@ -28,6 +28,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.regex.Pattern;
@ -301,11 +302,18 @@ public class TestAuditLogs {
// Turn off the logs
Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
logger.setLevel(Level.OFF);
// Close the appenders and force all logs to be flushed
Enumeration<?> appenders = logger.getAllAppenders();
while (appenders.hasMoreElements()) {
Appender appender = (Appender)appenders.nextElement();
appender.close();
}
BufferedReader reader = new BufferedReader(new FileReader(auditLogFile));
String line = null;
boolean ret = true;
try {
for (int i = 0; i < ndupe; i++) {
line = reader.readLine();

View File

@ -469,6 +469,8 @@
</test-commands>
<cleanup-commands>
<cache-admin-command>-removePool pool1</cache-admin-command>
<cache-admin-command>-removePool pool2</cache-admin-command>
<cache-admin-command>-removePool pool3</cache-admin-command>
</cleanup-commands>
<comparators>
<comparator>
@ -489,5 +491,33 @@
</comparator>
</comparators>
</test>
<test> <!--Tested -->
<description>Testing setting pool unlimited limits</description>
<test-commands>
<cache-admin-command>-addPool pool1 -limit unlimited -owner andrew -group andrew</cache-admin-command>
<cache-admin-command>-addPool pool2 -limit 10 -owner andrew -group andrew</cache-admin-command>
<cache-admin-command>-modifyPool pool2 -limit unlimited</cache-admin-command>
<cache-admin-command>-listPools</cache-admin-command>
</test-commands>
<cleanup-commands>
<cache-admin-command>-removePool pool1</cache-admin-command>
<cache-admin-command>-removePool pool2</cache-admin-command>
</cleanup-commands>
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>Found 2 results</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>pool1 andrew andrew rwxr-xr-x unlimited never</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>pool2 andrew andrew rwxr-xr-x unlimited never</expected-output>
</comparator>
</comparators>
</test>
</tests>
</configuration>

View File

@ -400,4 +400,10 @@ public class ResourceMgrDelegate extends YarnClient {
IOException {
return client.getContainers(applicationAttemptId);
}
@Override
public void moveApplicationAcrossQueues(ApplicationId appId, String queue)
throws YarnException, IOException {
client.moveApplicationAcrossQueues(appId, queue);
}
}

View File

@ -784,6 +784,12 @@
<artifactId>grizzly-http-servlet</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -17,6 +17,9 @@ Trunk - Unreleased
YARN-1499. Fair Scheduler changes for moving apps between queues (Sandy
Ryza)
YARN-1497. Command line additions for moving apps between queues (Sandy
Ryza)
IMPROVEMENTS
OPTIMIZATIONS
@ -103,6 +106,19 @@ Release 2.4.0 - UNRELEASED
new APIs for retrieving and storing timeline information. (Zhijie Shen via
vinodkv)
YARN-1490. Introduced the ability to make ResourceManager optionally not kill
all containers when an ApplicationMaster exits. (Jian He via vinodkv)
YARN-1041. Added the ApplicationMasterProtocol API for applications to use the
ability in ResourceManager to optionally not kill containers when the
ApplicationMaster exits. (Jian He via vinodkv)
YARN-1566. Changed Distributed Shell to retain containers across application
attempts. (Jian He via vinodkv)
YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. (Billie
Rinaldi via zjshen)
IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
@ -139,6 +155,17 @@ Release 2.4.0 - UNRELEASED
be available across RM failover by making using of a remote
configuration-provider. (Xuan Gong via vinodkv)
YARN-1665. Simplify the configuration of RM HA by having better default
values. (Xuan Gong via vinodkv)
YARN-1660. Simplified the RM HA configuration to accept and be able to simply
depend just on configuration properties of the form
yarn.resourcemanager.hostname.RMID and use the default ports for all service
addresses. (Xuan Gong via vinodkv)
YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
app-attempts separately from apps. (Jian He via vinodkv)
OPTIMIZATIONS
BUG FIXES
@ -194,6 +221,18 @@ Release 2.4.0 - UNRELEASED
YARN-1684. Fixed history server heap size in yarn script. (Billie Rinaldi
via zjshen)
YARN-1166. Fixed app-specific and attempt-specific QueueMetrics to be
triggered by accordingly app event and attempt event.
YARN-1689. Made RMAppAttempt get killed when RMApp is at ACCEPTED. (Vinod
Kumar Vavilapalli via zjshen)
YARN-1661. Fixed DS ApplicationMaster to write the correct exit log. (Vinod
Kumar Vavilapalli via zjshen)
YARN-1672. YarnConfiguration is missing a default for
yarn.nodemanager.log.retain-seconds (Naren Koneru via kasha)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -237,18 +276,8 @@ Release 2.3.0 - UNRELEASED
YARN-1029. Added embedded leader election in the ResourceManager. (Karthik
Kambatla via vinodkv)
YARN-1490. Introduced the ability to make ResourceManager optionally not kill
all containers when an ApplicationMaster exits. (Jian He via vinodkv)
YARN-1033. Expose RM active/standby state to Web UI and REST API (kasha)
YARN-1041. Added the ApplicationMasterProtocol API for applications to use the
ability in ResourceManager to optionally not kill containers when the
ApplicationMaster exits. (Jian He via vinodkv)
YARN-1566. Changed Distributed Shell to retain containers across application
attempts. (Jian He via vinodkv)
IMPROVEMENTS
YARN-305. Fair scheduler logs too many "Node offered to app" messages.
@ -412,9 +441,6 @@ Release 2.3.0 - UNRELEASED
YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port
information once an AM crashes. (Jian He via vinodkv)
YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
app-attempts separately from apps. (Jian He via vinodkv)
YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager
fail-over. (Xuan Gong via vinodkv)
@ -600,9 +626,6 @@ Release 2.3.0 - UNRELEASED
YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong
via kasha)
YARN-1166. Fixed app-specific and attempt-specific QueueMetrics to be
triggered by accordingly app event and attempt event.
YARN-1598. HA-related rmadmin commands don't work on a secure cluster (kasha)
YARN-1603. Remove two *.orig files which were unexpectedly committed.
@ -630,12 +653,6 @@ Release 2.3.0 - UNRELEASED
YARN-1628. Fixed the test failure in TestContainerManagerSecurity. (Vinod
Kumar Vavilapalli via zjshen)
YARN-1661. Fixed DS ApplicationMaster to write the correct exit log. (Vinod
Kumar Vavilapalli via zjshen)
YARN-1689. Made RMAppAttempt get killed when RMApp is at ACCEPTED. (Vinod
Kumar Vavilapalli via zjshen)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -94,9 +94,21 @@ public class ATSPutErrors {
@Public
@Unstable
public static class ATSPutError {
/**
* Error code returned when no start time can be found when putting an
* entity. This occurs when the entity does not already exist in the
* store and it is put with no start time or events specified.
*/
public static final int NO_START_TIME = 1;
/**
* Error code returned if an IOException is encountered when putting an
* entity.
*/
public static final int IO_EXCEPTION = 2;
private String entityId;
private String entityType;
private Integer errorCode;
private int errorCode;
/**
* Get the entity Id
@ -144,7 +156,7 @@ public class ATSPutErrors {
* @return an error code
*/
@XmlElement(name = "errorcode")
public Integer getErrorCode() {
public int getErrorCode() {
return errorCode;
}
@ -154,7 +166,7 @@ public class ATSPutErrors {
* @param errorCode
* an error code
*/
public void setErrorCode(Integer errorCode) {
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}

View File

@ -101,20 +101,7 @@ public class HAUtil {
for (String id: ids) {
// verify the RM service addresses configurations for every RMIds
for (String prefix : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
String confKey = null;
try {
confKey = addSuffix(prefix, id);
if (conf.getTrimmed(confKey) == null) {
throwBadConfigurationException(getNeedToSetValueMessage(confKey));
}
} catch (IllegalArgumentException iae) {
String errmsg = iae.getMessage();
if (confKey == null) {
// Error at addSuffix
errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID, id);
}
throwBadConfigurationException(errmsg);
}
checkAndSetRMRPCAddress(prefix, id, conf);
}
setValue.append(id);
setValue.append(",");
@ -249,9 +236,13 @@ public class HAUtil {
@InterfaceAudience.Private
@VisibleForTesting
static String getConfKeyForRMInstance(String prefix, Configuration conf) {
return YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS.contains(prefix)
? addSuffix(prefix, getRMHAId(conf))
: prefix;
if (!YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS.contains(prefix)) {
return prefix;
} else {
String RMId = getRMHAId(conf);
checkAndSetRMRPCAddress(prefix, RMId, conf);
return addSuffix(prefix, RMId);
}
}
public static String getConfValueForRMInstance(String prefix,
@ -284,4 +275,30 @@ public class HAUtil {
}
return key + "." + suffix;
}
private static void checkAndSetRMRPCAddress(String prefix, String RMId,
Configuration conf) {
String rpcAddressConfKey = null;
try {
rpcAddressConfKey = addSuffix(prefix, RMId);
if (conf.getTrimmed(rpcAddressConfKey) == null) {
String hostNameConfKey = addSuffix(YarnConfiguration.RM_HOSTNAME, RMId);
String confVal = conf.getTrimmed(hostNameConfKey);
if (confVal == null) {
throwBadConfigurationException(getNeedToSetValueMessage(
hostNameConfKey + " or " + addSuffix(prefix, RMId)));
} else {
conf.set(addSuffix(prefix, RMId), confVal + ":"
+ YarnConfiguration.getRMDefaultPortNumber(prefix));
}
}
} catch (IllegalArgumentException iae) {
String errmsg = iae.getMessage();
if (rpcAddressConfKey == null) {
// Error at addSuffix
errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID, RMId);
}
throwBadConfigurationException(errmsg);
}
}
}

View File

@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
@ -108,6 +109,8 @@ public class YarnConfiguration extends Configuration {
public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster-id";
public static final String RM_HOSTNAME = RM_PREFIX + "hostname";
/** The address of the applications manager interface in the RM.*/
public static final String RM_ADDRESS =
RM_PREFIX + "address";
@ -373,11 +376,11 @@ public class YarnConfiguration extends Configuration {
public static final String AUTO_FAILOVER_ENABLED =
AUTO_FAILOVER_PREFIX + "enabled";
public static final boolean DEFAULT_AUTO_FAILOVER_ENABLED = false;
public static final boolean DEFAULT_AUTO_FAILOVER_ENABLED = true;
public static final String AUTO_FAILOVER_EMBEDDED =
AUTO_FAILOVER_PREFIX + "embedded";
public static final boolean DEFAULT_AUTO_FAILOVER_EMBEDDED = false;
public static final boolean DEFAULT_AUTO_FAILOVER_EMBEDDED = true;
public static final String AUTO_FAILOVER_ZK_BASE_PATH =
AUTO_FAILOVER_PREFIX + "zk-base-path";
@ -626,6 +629,7 @@ public class YarnConfiguration extends Configuration {
*/
public static final String NM_LOG_RETAIN_SECONDS = NM_PREFIX
+ "log.retain-seconds";
public static final long DEFAULT_NM_LOG_RETAIN_SECONDS = 3 * 60 * 60;
/**
* Number of threads used in log cleanup. Only applicable if Log aggregation
@ -1037,6 +1041,10 @@ public class YarnConfiguration extends Configuration {
/** ATS store class */
public static final String ATS_STORE = ATS_PREFIX + "store.class";
/** ATS leveldb path */
public static final String ATS_LEVELDB_PATH_PROPERTY =
ATS_PREFIX + "leveldb-apptimeline-store.path";
////////////////////////////////
// Other Configs
////////////////////////////////
@ -1139,4 +1147,27 @@ public class YarnConfiguration extends Configuration {
}
return super.updateConnectAddr(prefix, addr);
}
@Private
public static int getRMDefaultPortNumber(String addressPrefix) {
if (addressPrefix.equals(YarnConfiguration.RM_ADDRESS)) {
return YarnConfiguration.DEFAULT_RM_PORT;
} else if (addressPrefix.equals(YarnConfiguration.RM_SCHEDULER_ADDRESS)) {
return YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT;
} else if (addressPrefix.equals(YarnConfiguration.RM_WEBAPP_ADDRESS)) {
return YarnConfiguration.DEFAULT_RM_WEBAPP_PORT;
} else if (addressPrefix.equals(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS)) {
return YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT;
} else if (addressPrefix
.equals(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS)) {
return YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT;
} else if (addressPrefix.equals(YarnConfiguration.RM_ADMIN_ADDRESS)) {
return YarnConfiguration.DEFAULT_RM_ADMIN_PORT;
} else {
throw new HadoopIllegalArgumentException(
"Invalid RM RPC address Prefix: " + addressPrefix
+ ". The valid value should be one of "
+ YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS);
}
}
}

View File

@ -436,4 +436,19 @@ public abstract class YarnClient extends AbstractService {
public abstract List<ContainerReport> getContainers(
ApplicationAttemptId applicationAttemptId) throws YarnException,
IOException;
/**
* <p>
* Attempts to move the given application to the given queue.
* </p>
*
* @param appId
* Application to move.
* @param queue
* Queue to place it in to.
* @throws YarnException
* @throws IOException
*/
public abstract void moveApplicationAcrossQueues(ApplicationId appId,
String queue) throws YarnException, IOException;
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@ -478,4 +479,12 @@ public class YarnClientImpl extends YarnClient {
}
throw new YarnException("History service is not enabled.");
}
@Override
public void moveApplicationAcrossQueues(ApplicationId appId,
String queue) throws YarnException, IOException {
MoveApplicationAcrossQueuesRequest request =
MoveApplicationAcrossQueuesRequest.newInstance(appId, queue);
rmClient.moveApplicationAcrossQueues(request);
}
}

View File

@ -61,6 +61,7 @@ public class ApplicationCLI extends YarnCLI {
private static final String APP_TYPE_CMD = "appTypes";
private static final String APP_STATE_CMD = "appStates";
private static final String ALLSTATES_OPTION = "ALL";
private static final String QUEUE_CMD = "queue";
public static final String APPLICATION = "application";
public static final String APPLICATION_ATTEMPT = "applicationattempt";
public static final String CONTAINER = "container";
@ -96,6 +97,10 @@ public class ApplicationCLI extends YarnCLI {
+ "and -appStates to filter applications based on application state");
}
opts.addOption(KILL_CMD, true, "Kills the application.");
opts.addOption(MOVE_TO_QUEUE_CMD, true, "Moves the application to a "
+ "different queue.");
opts.addOption(QUEUE_CMD, true, "Works with the movetoqueue command to"
+ " specify which queue to move an application to.");
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
Option appTypeOpt = new Option(APP_TYPE_CMD, true, "Works with -list to "
+ "filter applications based on "
@ -112,6 +117,8 @@ public class ApplicationCLI extends YarnCLI {
appStateOpt.setArgName("States");
opts.addOption(appStateOpt);
opts.getOption(KILL_CMD).setArgName("Application ID");
opts.getOption(MOVE_TO_QUEUE_CMD).setArgName("Application ID");
opts.getOption(QUEUE_CMD).setArgName("Queue Name");
opts.getOption(STATUS_CMD).setArgName("Application ID");
int exitCode = -1;
@ -202,6 +209,13 @@ public class ApplicationCLI extends YarnCLI {
return exitCode;
}
killApplication(cliParser.getOptionValue(KILL_CMD));
} else if (cliParser.hasOption(MOVE_TO_QUEUE_CMD)) {
if (!cliParser.hasOption(QUEUE_CMD)) {
printUsage(opts);
return exitCode;
}
moveApplicationAcrossQueues(cliParser.getOptionValue(MOVE_TO_QUEUE_CMD),
cliParser.getOptionValue(QUEUE_CMD));
} else if (cliParser.hasOption(HELP_CMD)) {
printUsage(opts);
return 0;
@ -366,6 +380,28 @@ public class ApplicationCLI extends YarnCLI {
client.killApplication(appId);
}
}
/**
* Kills the application with the application id as appId
*
* @param applicationId
* @throws YarnException
* @throws IOException
*/
private void moveApplicationAcrossQueues(String applicationId, String queue)
throws YarnException, IOException {
ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
ApplicationReport appReport = client.getApplicationReport(appId);
if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
|| appReport.getYarnApplicationState() == YarnApplicationState.KILLED
|| appReport.getYarnApplicationState() == YarnApplicationState.FAILED) {
sysout.println("Application " + applicationId + " has already finished ");
} else {
sysout.println("Moving application " + applicationId + " to queue " + queue);
client.moveApplicationAcrossQueues(appId, queue);
sysout.println("Successfully completed move.");
}
}
/**
* Prints the application report for an application id.

View File

@ -33,6 +33,7 @@ public abstract class YarnCLI extends Configured implements Tool {
public static final String STATUS_CMD = "status";
public static final String LIST_CMD = "list";
public static final String KILL_CMD = "kill";
public static final String MOVE_TO_QUEUE_CMD = "movetoqueue";
public static final String HELP_CMD = "help";
protected PrintStream sysout;
protected PrintStream syserr;

View File

@ -172,8 +172,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
@Test
public void testAutomaticFailover()
throws YarnException, InterruptedException, IOException {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED, true);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster");
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 2000);
@ -193,6 +191,7 @@ public class TestRMFailover extends ClientBaseWithFixes {
@Test
public void testWebAppProxyInStandAloneMode() throws YarnException,
InterruptedException, IOException {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
WebAppProxyServer webAppProxyServer = new WebAppProxyServer();
try {
conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:9099");
@ -227,6 +226,7 @@ public class TestRMFailover extends ClientBaseWithFixes {
@Test
public void testEmbeddedWebAppProxy() throws YarnException,
InterruptedException, IOException {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster.init(conf);
cluster.start();
getAdminService(0).transitionToActive(req);

View File

@ -675,6 +675,7 @@ public class TestYarnCLI {
int result = spyCli.run(new String[] { "-help" });
Assert.assertTrue(result == 0);
verify(spyCli).printUsage(any(Options.class));
System.err.println(sysOutStream.toString()); //todo sandyt remove this hejfkdsl
Assert.assertEquals(createApplicationCLIHelpMessage(),
sysOutStream.toString());
@ -748,6 +749,56 @@ public class TestYarnCLI {
"' doesn't exist in RM.", ex.getMessage());
}
}
@Test
public void testMoveApplicationAcrossQueues() throws Exception {
ApplicationCLI cli = createAndGetAppCLI();
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
ApplicationReport newApplicationReport2 = ApplicationReport.newInstance(
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport2);
int result = cli.run(new String[] { "-movetoqueue", applicationId.toString(),
"-queue", "targetqueue"});
assertEquals(0, result);
verify(client, times(0)).moveApplicationAcrossQueues(
any(ApplicationId.class), any(String.class));
verify(sysOut).println(
"Application " + applicationId + " has already finished ");
ApplicationReport newApplicationReport = ApplicationReport.newInstance(
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport);
result = cli.run(new String[] { "-movetoqueue", applicationId.toString(),
"-queue", "targetqueue"});
assertEquals(0, result);
verify(client).moveApplicationAcrossQueues(any(ApplicationId.class),
any(String.class));
verify(sysOut).println("Moving application application_1234_0005 to queue targetqueue");
verify(sysOut).println("Successfully completed move.");
doThrow(new ApplicationNotFoundException("Application with id '"
+ applicationId + "' doesn't exist in RM.")).when(client)
.moveApplicationAcrossQueues(applicationId, "targetqueue");
cli = createAndGetAppCLI();
try {
result = cli.run(new String[] { "-movetoqueue", applicationId.toString(),
"-queue", "targetqueue"});
Assert.fail();
} catch (Exception ex) {
Assert.assertTrue(ex instanceof ApplicationNotFoundException);
Assert.assertEquals("Application with id '" + applicationId +
"' doesn't exist in RM.", ex.getMessage());
}
}
@Test
public void testListClusterNodes() throws Exception {
@ -1087,23 +1138,28 @@ public class TestYarnCLI {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("usage: application");
pw.println(" -appStates <States> Works with -list to filter applications based");
pw.println(" on input comma-separated list of application");
pw.println(" states. The valid application state can be one");
pw.println(" of the following:");
pw.println(" ALL,NEW,NEW_SAVING,SUBMITTED,ACCEPTED,RUNNING,");
pw.println(" FINISHED,FAILED,KILLED");
pw.println(" -appTypes <Types> Works with -list to filter applications based");
pw.println(" on input comma-separated list of application");
pw.println(" types.");
pw.println(" -help Displays help for all commands.");
pw.println(" -kill <Application ID> Kills the application.");
pw.println(" -list List applications from the RM. Supports");
pw.println(" optional use of -appTypes to filter");
pw.println(" applications based on application type, and");
pw.println(" -appStates to filter applications based on");
pw.println(" application state");
pw.println(" -status <Application ID> Prints the status of the application.");
pw.println(" -appStates <States> Works with -list to filter applications");
pw.println(" based on input comma-separated list of");
pw.println(" application states. The valid application");
pw.println(" state can be one of the following:");
pw.println(" ALL,NEW,NEW_SAVING,SUBMITTED,ACCEPTED,RUN");
pw.println(" NING,FINISHED,FAILED,KILLED");
pw.println(" -appTypes <Types> Works with -list to filter applications");
pw.println(" based on input comma-separated list of");
pw.println(" application types.");
pw.println(" -help Displays help for all commands.");
pw.println(" -kill <Application ID> Kills the application.");
pw.println(" -list List applications from the RM. Supports");
pw.println(" optional use of -appTypes to filter");
pw.println(" applications based on application type,");
pw.println(" and -appStates to filter applications");
pw.println(" based on application state");
pw.println(" -movetoqueue <Application ID> Moves the application to a different");
pw.println(" queue.");
pw.println(" -queue <Queue Name> Works with the movetoqueue command to");
pw.println(" specify which queue to move an");
pw.println(" application to.");
pw.println(" -status <Application ID> Prints the status of the application.");
pw.close();
String appsHelpStr = baos.toString("UTF-8");
return appsHelpStr;

View File

@ -405,17 +405,20 @@
</property>
<property>
<description>Enable automatic failover.</description>
<description>Enable automatic failover.
By default, it is enabled only when HA is enabled</description>
<name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
<value>false</value>
<value>true</value>
</property>
<property>
<description>Enable embedded automatic failover. The embedded elector
relies on the RM state store to handle fencing, and is primarily intended
to be used in conjunction with ZKRMStateStore.</description>
<description>Enable embedded automatic failover.
By default, it is enabled only when HA is enabled.
The embedded elector relies on the RM state store to handle fencing,
and is primarily intended to be used in conjunction with ZKRMStateStore.
</description>
<name>yarn.resourcemanager.ha.automatic-failover.embedded</name>
<value>false</value>
<value>true</value>
</property>
<property>
@ -1142,7 +1145,13 @@
<property>
<description>Store class name for application timeline store</description>
<name>yarn.ats.store.class</name>
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.MemoryApplicationTimelineStore</value>
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.LeveldbApplicationTimelineStore</value>
</property>
<property>
<description>Store file name for leveldb application timeline store</description>
<name>yarn.ats.leveldb-apptimeline-store.path</name>
<value>${yarn.log.dir}/ats</value>
</property>
<!-- Other configuration -->

View File

@ -117,14 +117,14 @@ public class TestApplicationTimelineRecords {
ATSPutError error1 = new ATSPutError();
error1.setEntityId("entity id 1");
error1.setEntityId("entity type 1");
error1.setErrorCode(1);
error1.setErrorCode(ATSPutError.NO_START_TIME);
atsPutErrors.addError(error1);
List<ATSPutError> errors = new ArrayList<ATSPutError>();
errors.add(error1);
ATSPutError error2 = new ATSPutError();
error2.setEntityId("entity id 2");
error2.setEntityId("entity type 2");
error2.setErrorCode(2);
error2.setErrorCode(ATSPutError.IO_EXCEPTION);
errors.add(error2);
atsPutErrors.addErrors(errors);

View File

@ -159,8 +159,9 @@ public class TestHAUtil {
String confKey =
HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID);
assertEquals("YarnRuntimeException by Configuration#set()",
HAUtil.BAD_CONFIG_MESSAGE_PREFIX + HAUtil.getNeedToSetValueMessage(confKey),
e.getMessage());
HAUtil.BAD_CONFIG_MESSAGE_PREFIX + HAUtil.getNeedToSetValueMessage(
HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, RM1_NODE_ID)
+ " or " + confKey), e.getMessage());
}
// simulate the case YarnConfiguration.RM_HA_IDS doesn't contain

View File

@ -167,6 +167,25 @@
<artifactId>jersey-test-framework-grizzly2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
@ -78,13 +79,15 @@ public interface ApplicationTimelineReader {
* retrieve (see {@link Field}). If the set of fields
* contains {@link Field#LAST_EVENT_ONLY} and not
* {@link Field#EVENTS}, the most recent event for
* each entity is retrieved.
* each entity is retrieved. If null, retrieves all
* fields.
* @return An {@link ATSEntities} object.
* @throws IOException
*/
ATSEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fieldsToRetrieve);
EnumSet<Field> fieldsToRetrieve) throws IOException;
/**
* This method retrieves the entity information for a given entity.
@ -95,11 +98,13 @@ public interface ApplicationTimelineReader {
* retrieve (see {@link Field}). If the set of
* fields contains {@link Field#LAST_EVENT_ONLY} and
* not {@link Field#EVENTS}, the most recent event
* for each entity is retrieved.
* for each entity is retrieved. If null, retrieves
* all fields.
* @return An {@link ATSEntity} object.
* @throws IOException
*/
ATSEntity getEntity(String entity, String entityType, EnumSet<Field>
fieldsToRetrieve);
fieldsToRetrieve) throws IOException;
/**
* This method retrieves the events for a list of entities all of the same
@ -118,8 +123,9 @@ public interface ApplicationTimelineReader {
* @param eventTypes Restricts the events returned to the given types. If
* null, events of all types will be returned.
* @return An {@link ATSEvents} object.
* @throws IOException
*/
ATSEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventTypes);
Long windowEnd, Set<String> eventTypes) throws IOException;
}

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import java.io.IOException;
/**
* This interface is for storing application timeline information.
*/
@ -37,7 +39,8 @@ public interface ApplicationTimelineWriter {
*
* @param data An {@link ATSEntities} object.
* @return An {@link ATSPutErrors} object.
* @throws IOException
*/
ATSPutErrors put(ATSEntities data);
ATSPutErrors put(ATSEntities data) throws IOException;
}

View File

@ -26,12 +26,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
*/
@Private
@Unstable
public class EntityId implements Comparable<EntityId> {
public class EntityIdentifier implements Comparable<EntityIdentifier> {
private String id;
private String type;
public EntityId(String id, String type) {
public EntityIdentifier(String id, String type) {
this.id = id;
this.type = type;
}
@ -53,7 +53,7 @@ public class EntityId implements Comparable<EntityId> {
}
@Override
public int compareTo(EntityId other) {
public int compareTo(EntityIdentifier other) {
int c = type.compareTo(other.type);
if (c != 0) return c;
return id.compareTo(other.id);
@ -78,7 +78,7 @@ public class EntityId implements Comparable<EntityId> {
return false;
if (getClass() != obj.getClass())
return false;
EntityId other = (EntityId) obj;
EntityIdentifier other = (EntityIdentifier) obj;
if (id == null) {
if (other.id != null)
return false;

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableUtils;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
* A utility class providing methods for serializing and deserializing
* objects. The {@link #write(Object)}, {@link #read(byte[])} and {@link
* #write(java.io.DataOutputStream, Object)}, {@link
* #read(java.io.DataInputStream)} methods are used by the
* {@link LeveldbApplicationTimelineStore} to store and retrieve arbitrary
* JSON, while the {@link #writeReverseOrderedLong} and {@link
* #readReverseOrderedLong} methods are used to sort entities in descending
* start time order.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class GenericObjectMapper {
private static final byte[] EMPTY_BYTES = new byte[0];
private static final byte LONG = 0x1;
private static final byte INTEGER = 0x2;
private static final byte DOUBLE = 0x3;
private static final byte STRING = 0x4;
private static final byte BOOLEAN = 0x5;
private static final byte LIST = 0x6;
private static final byte MAP = 0x7;
/**
* Serializes an Object into a byte array. Along with {@link #read(byte[]) },
* can be used to serialize an Object and deserialize it into an Object of
* the same type without needing to specify the Object's type,
* as long as it is one of the JSON-compatible objects Long, Integer,
* Double, String, Boolean, List, or Map. The current implementation uses
* ObjectMapper to serialize complex objects (List and Map) while using
* Writable to serialize simpler objects, to produce fewer bytes.
*
* @param o An Object
* @return A byte array representation of the Object
* @throws IOException
*/
public static byte[] write(Object o) throws IOException {
if (o == null)
return EMPTY_BYTES;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
write(new DataOutputStream(baos), o);
return baos.toByteArray();
}
/**
* Serializes an Object and writes it to a DataOutputStream. Along with
* {@link #read(java.io.DataInputStream)}, can be used to serialize an Object
* and deserialize it into an Object of the same type without needing to
* specify the Object's type, as long as it is one of the JSON-compatible
* objects Long, Integer, Double, String, Boolean, List, or Map. The current
* implementation uses ObjectMapper to serialize complex objects (List and
* Map) while using Writable to serialize simpler objects, to produce fewer
* bytes.
*
* @param dos A DataOutputStream
* @param o An Object
* @throws IOException
*/
public static void write(DataOutputStream dos, Object o)
throws IOException {
if (o == null)
return;
if (o instanceof Long) {
dos.write(LONG);
WritableUtils.writeVLong(dos, (Long) o);
} else if(o instanceof Integer) {
dos.write(INTEGER);
WritableUtils.writeVInt(dos, (Integer) o);
} else if(o instanceof Double) {
dos.write(DOUBLE);
dos.writeDouble((Double) o);
} else if (o instanceof String) {
dos.write(STRING);
WritableUtils.writeString(dos, (String) o);
} else if (o instanceof Boolean) {
dos.write(BOOLEAN);
dos.writeBoolean((Boolean) o);
} else if (o instanceof List) {
dos.write(LIST);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(dos, o);
} else if (o instanceof Map) {
dos.write(MAP);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(dos, o);
} else {
throw new IOException("Couldn't serialize object");
}
}
/**
* Deserializes an Object from a byte array created with
* {@link #write(Object)}.
*
* @param b A byte array
* @return An Object
* @throws IOException
*/
public static Object read(byte[] b) throws IOException {
if (b == null || b.length == 0)
return null;
ByteArrayInputStream bais = new ByteArrayInputStream(b);
return read(new DataInputStream(bais));
}
/**
* Reads an Object from a DataInputStream whose data has been written with
* {@link #write(java.io.DataOutputStream, Object)}.
*
* @param dis A DataInputStream
* @return An Object, null if an unrecognized type
* @throws IOException
*/
public static Object read(DataInputStream dis) throws IOException {
byte code = (byte)dis.read();
ObjectMapper mapper;
switch (code) {
case LONG:
return WritableUtils.readVLong(dis);
case INTEGER:
return WritableUtils.readVInt(dis);
case DOUBLE:
return dis.readDouble();
case STRING:
return WritableUtils.readString(dis);
case BOOLEAN:
return dis.readBoolean();
case LIST:
mapper = new ObjectMapper();
return mapper.readValue(dis, ArrayList.class);
case MAP:
mapper = new ObjectMapper();
return mapper.readValue(dis, HashMap.class);
default:
return null;
}
}
/**
* Converts a long to a 8-byte array so that lexicographic ordering of the
* produced byte arrays sort the longs in descending order.
*
* @param l A long
* @return A byte array
*/
public static byte[] writeReverseOrderedLong(long l) {
byte[] b = new byte[8];
b[0] = (byte)(0x7f ^ ((l >> 56) & 0xff));
for (int i = 1; i < 7; i++)
b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff));
b[7] = (byte)(0xff ^ (l & 0xff));
return b;
}
/**
* Reads 8 bytes from an array starting at the specified offset and
* converts them to a long. The bytes are assumed to have been created
* with {@link #writeReverseOrderedLong}.
*
* @param b A byte array
* @param offset An offset into the byte array
* @return A long
*/
public static long readReverseOrderedLong(byte[] b, int offset) {
long l = b[offset] & 0xff;
for (int i = 1; i < 8; i++) {
l = l << 8;
l = l | (b[offset+i]&0xff);
}
return l ^ 0x7fffffffffffffffl;
}
}

View File

@ -0,0 +1,854 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents.ATSEventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import static org.apache.hadoop.yarn.server.applicationhistoryservice
.apptimeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.applicationhistoryservice
.apptimeline.GenericObjectMapper.writeReverseOrderedLong;
/**
* An implementation of an application timeline store backed by leveldb.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class LeveldbApplicationTimelineStore extends AbstractService
implements ApplicationTimelineStore {
private static final Log LOG = LogFactory
.getLog(LeveldbApplicationTimelineStore.class);
private static final String FILENAME = "leveldb-apptimeline-store.ldb";
private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes();
private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes();
private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes();
private static final byte[] PRIMARY_FILTER_COLUMN = "f".getBytes();
private static final byte[] OTHER_INFO_COLUMN = "i".getBytes();
private static final byte[] RELATED_COLUMN = "r".getBytes();
private static final byte[] TIME_COLUMN = "t".getBytes();
private static final byte[] EMPTY_BYTES = new byte[0];
private static final int START_TIME_CACHE_SIZE = 10000;
@SuppressWarnings("unchecked")
private final Map<EntityIdentifier, Long> startTimeCache =
Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE));
private DB db;
public LeveldbApplicationTimelineStore() {
super(LeveldbApplicationTimelineStore.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
Options options = new Options();
options.createIfMissing(true);
JniDBFactory factory = new JniDBFactory();
String path = conf.get(YarnConfiguration.ATS_LEVELDB_PATH_PROPERTY);
File p = new File(path);
if (!p.exists())
if (!p.mkdirs())
throw new IOException("Couldn't create directory for leveldb " +
"application timeline store " + path);
LOG.info("Using leveldb path " + path);
db = factory.open(new File(path, FILENAME), options);
super.serviceInit(conf);
}
@Override
protected void serviceStop() throws Exception {
IOUtils.cleanup(LOG, db);
super.serviceStop();
}
private static class KeyBuilder {
private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
private byte[][] b;
private boolean[] useSeparator;
private int index;
private int length;
public KeyBuilder(int size) {
b = new byte[size][];
useSeparator = new boolean[size];
index = 0;
length = 0;
}
public static KeyBuilder newInstance() {
return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
}
public KeyBuilder add(String s) {
return add(s.getBytes(), true);
}
public KeyBuilder add(byte[] t) {
return add(t, false);
}
public KeyBuilder add(byte[] t, boolean sep) {
b[index] = t;
useSeparator[index] = sep;
length += t.length;
if (sep)
length++;
index++;
return this;
}
public byte[] getBytes() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
for (int i = 0; i < index; i++) {
baos.write(b[i]);
if (i < index-1 && useSeparator[i])
baos.write(0x0);
}
return baos.toByteArray();
}
public byte[] getBytesForLookup() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
for (int i = 0; i < index; i++) {
baos.write(b[i]);
if (useSeparator[i])
baos.write(0x0);
}
return baos.toByteArray();
}
}
private static class KeyParser {
private final byte[] b;
private int offset;
public KeyParser(byte[] b, int offset) {
this.b = b;
this.offset = offset;
}
public String getNextString() throws IOException {
if (offset >= b.length)
throw new IOException(
"tried to read nonexistent string from byte array");
int i = 0;
while (offset+i < b.length && b[offset+i] != 0x0)
i++;
String s = new String(b, offset, i);
offset = offset + i + 1;
return s;
}
public long getNextLong() throws IOException {
if (offset+8 >= b.length)
throw new IOException("byte array ran out when trying to read long");
long l = readReverseOrderedLong(b, offset);
offset += 8;
return l;
}
public int getOffset() {
return offset;
}
}
@Override
public ATSEntity getEntity(String entity, String entityType,
EnumSet<Field> fields) throws IOException {
DBIterator iterator = null;
try {
byte[] revStartTime = getStartTime(entity, entityType, null, null, null);
if (revStartTime == null)
return null;
byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).add(revStartTime).add(entity).getBytesForLookup();
iterator = db.iterator();
iterator.seek(prefix);
return getEntity(entity, entityType,
readReverseOrderedLong(revStartTime, 0), fields, iterator, prefix,
prefix.length);
} finally {
IOUtils.cleanup(LOG, iterator);
}
}
/**
* Read entity from a db iterator. If no information is found in the
* specified fields for this entity, return null.
*/
private static ATSEntity getEntity(String entity, String entityType,
Long startTime, EnumSet<Field> fields, DBIterator iterator,
byte[] prefix, int prefixlen) throws IOException {
if (fields == null)
fields = EnumSet.allOf(Field.class);
ATSEntity atsEntity = new ATSEntity();
boolean events = false;
boolean lastEvent = false;
if (fields.contains(Field.EVENTS)) {
events = true;
atsEntity.setEvents(new ArrayList<ATSEvent>());
} else if (fields.contains(Field.LAST_EVENT_ONLY)) {
lastEvent = true;
atsEntity.setEvents(new ArrayList<ATSEvent>());
}
else {
atsEntity.setEvents(null);
}
boolean relatedEntities = false;
if (fields.contains(Field.RELATED_ENTITIES)) {
relatedEntities = true;
atsEntity.setRelatedEntities(new HashMap<String, List<String>>());
} else {
atsEntity.setRelatedEntities(null);
}
boolean primaryFilters = false;
if (fields.contains(Field.PRIMARY_FILTERS)) {
primaryFilters = true;
atsEntity.setPrimaryFilters(new HashMap<String, Object>());
} else {
atsEntity.setPrimaryFilters(null);
}
boolean otherInfo = false;
if (fields.contains(Field.OTHER_INFO)) {
otherInfo = true;
atsEntity.setOtherInfo(new HashMap<String, Object>());
} else {
atsEntity.setOtherInfo(null);
}
// iterate through the entity's entry, parsing information if it is part
// of a requested field
for (; iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefixlen, key))
break;
if (key[prefixlen] == PRIMARY_FILTER_COLUMN[0]) {
if (primaryFilters) {
atsEntity.addPrimaryFilter(parseRemainingKey(key,
prefixlen + PRIMARY_FILTER_COLUMN.length),
GenericObjectMapper.read(iterator.peekNext().getValue()));
}
} else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
if (otherInfo) {
atsEntity.addOtherInfo(parseRemainingKey(key,
prefixlen + OTHER_INFO_COLUMN.length),
GenericObjectMapper.read(iterator.peekNext().getValue()));
}
} else if (key[prefixlen] == RELATED_COLUMN[0]) {
if (relatedEntities) {
addRelatedEntity(atsEntity, key,
prefixlen + RELATED_COLUMN.length);
}
} else if (key[prefixlen] == TIME_COLUMN[0]) {
if (events || (lastEvent && atsEntity.getEvents().size() == 0)) {
ATSEvent event = getEntityEvent(null, key, prefixlen +
TIME_COLUMN.length, iterator.peekNext().getValue());
if (event != null) {
atsEntity.addEvent(event);
}
}
} else {
LOG.warn(String.format("Found unexpected column for entity %s of " +
"type %s (0x%02x)", entity, entityType, key[prefixlen]));
}
}
atsEntity.setEntityId(entity);
atsEntity.setEntityType(entityType);
atsEntity.setStartTime(startTime);
return atsEntity;
}
@Override
public ATSEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventType) throws IOException {
ATSEvents atsEvents = new ATSEvents();
if (entityIds == null || entityIds.isEmpty())
return atsEvents;
// create a lexicographically-ordered map from start time to entities
Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
List<EntityIdentifier>>(new Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
o2.length);
}
});
DBIterator iterator = null;
try {
// look up start times for the specified entities
// skip entities with no start time
for (String entity : entityIds) {
byte[] startTime = getStartTime(entity, entityType, null, null, null);
if (startTime != null) {
List<EntityIdentifier> entities = startTimeMap.get(startTime);
if (entities == null) {
entities = new ArrayList<EntityIdentifier>();
startTimeMap.put(startTime, entities);
}
entities.add(new EntityIdentifier(entity, entityType));
}
}
for (Entry<byte[], List<EntityIdentifier>> entry :
startTimeMap.entrySet()) {
// look up the events matching the given parameters (limit,
// start time, end time, event types) for entities whose start times
// were found and add the entities to the return list
byte[] revStartTime = entry.getKey();
for (EntityIdentifier entity : entry.getValue()) {
ATSEventsOfOneEntity atsEntity = new ATSEventsOfOneEntity();
atsEntity.setEntityId(entity.getId());
atsEntity.setEntityType(entityType);
atsEvents.addEvent(atsEntity);
KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).add(revStartTime).add(entity.getId())
.add(TIME_COLUMN);
byte[] prefix = kb.getBytesForLookup();
if (windowEnd == null) {
windowEnd = Long.MAX_VALUE;
}
byte[] revts = writeReverseOrderedLong(windowEnd);
kb.add(revts);
byte[] first = kb.getBytesForLookup();
byte[] last = null;
if (windowStart != null) {
last = KeyBuilder.newInstance().add(prefix)
.add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
}
if (limit == null) {
limit = DEFAULT_LIMIT;
}
iterator = db.iterator();
for (iterator.seek(first); atsEntity.getEvents().size() < limit &&
iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
WritableComparator.compareBytes(key, 0, key.length, last, 0,
last.length) > 0))
break;
ATSEvent event = getEntityEvent(eventType, key, prefix.length,
iterator.peekNext().getValue());
if (event != null)
atsEntity.addEvent(event);
}
}
}
} finally {
IOUtils.cleanup(LOG, iterator);
}
return atsEvents;
}
/**
* Returns true if the byte array begins with the specified prefix.
*/
private static boolean prefixMatches(byte[] prefix, int prefixlen,
byte[] b) {
if (b.length < prefixlen)
return false;
return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
prefixlen) == 0;
}
@Override
public ATSEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields) throws IOException {
if (primaryFilter == null) {
// if no primary filter is specified, prefix the lookup with
// ENTITY_ENTRY_PREFIX
return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
windowStart, windowEnd, secondaryFilters, fields);
} else {
// if a primary filter is specified, prefix the lookup with
// INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
// ENTITY_ENTRY_PREFIX
byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
.add(primaryFilter.getName())
.add(GenericObjectMapper.write(primaryFilter.getValue()), true)
.add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
secondaryFilters, fields);
}
}
/**
* Retrieves a list of entities satisfying given parameters.
*
* @param base A byte array prefix for the lookup
* @param entityType The type of the entity
* @param limit A limit on the number of entities to return
* @param starttime The earliest entity start time to retrieve (exclusive)
* @param endtime The latest entity start time to retrieve (inclusive)
* @param secondaryFilters Filter pairs that the entities should match
* @param fields The set of fields to retrieve
* @return A list of entities
* @throws IOException
*/
private ATSEntities getEntityByTime(byte[] base,
String entityType, Long limit, Long starttime, Long endtime,
Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields)
throws IOException {
DBIterator iterator = null;
try {
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
// only db keys matching the prefix (base + entity type) will be parsed
byte[] prefix = kb.getBytesForLookup();
if (endtime == null) {
// if end time is null, place no restriction on end time
endtime = Long.MAX_VALUE;
}
// using end time, construct a first key that will be seeked to
byte[] revts = writeReverseOrderedLong(endtime);
kb.add(revts);
byte[] first = kb.getBytesForLookup();
byte[] last = null;
if (starttime != null) {
// if start time is not null, set a last key that will not be
// iterated past
last = KeyBuilder.newInstance().add(base).add(entityType)
.add(writeReverseOrderedLong(starttime)).getBytesForLookup();
}
if (limit == null) {
// if limit is not specified, use the default
limit = DEFAULT_LIMIT;
}
ATSEntities atsEntities = new ATSEntities();
iterator = db.iterator();
iterator.seek(first);
// iterate until one of the following conditions is met: limit is
// reached, there are no more keys, the key prefix no longer matches,
// or a start time has been specified and reached/exceeded
while (atsEntities.getEntities().size() < limit && iterator.hasNext()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
WritableComparator.compareBytes(key, 0, key.length, last, 0,
last.length) > 0))
break;
// read the start time and entity from the current key
KeyParser kp = new KeyParser(key, prefix.length);
Long startTime = kp.getNextLong();
String entity = kp.getNextString();
// parse the entity that owns this key, iterating over all keys for
// the entity
ATSEntity atsEntity = getEntity(entity, entityType, startTime,
fields, iterator, key, kp.getOffset());
if (atsEntity == null)
continue;
// determine if the retrieved entity matches the provided secondary
// filters, and if so add it to the list of entities to return
boolean filterPassed = true;
if (secondaryFilters != null) {
for (NameValuePair filter : secondaryFilters) {
Object v = atsEntity.getOtherInfo().get(filter.getName());
if (v == null)
v = atsEntity.getPrimaryFilters().get(filter.getName());
if (v == null || !v.equals(filter.getValue())) {
filterPassed = false;
break;
}
}
}
if (filterPassed)
atsEntities.addEntity(atsEntity);
}
return atsEntities;
} finally {
IOUtils.cleanup(LOG, iterator);
}
}
/**
* Put a single entity. If there is an error, add a PutError to the given
* response.
*/
private void put(ATSEntity atsEntity, ATSPutErrors response) {
WriteBatch writeBatch = null;
try {
writeBatch = db.createWriteBatch();
List<ATSEvent> events = atsEntity.getEvents();
// look up the start time for the entity
byte[] revStartTime = getStartTime(atsEntity.getEntityId(),
atsEntity.getEntityType(), atsEntity.getStartTime(), events,
writeBatch);
if (revStartTime == null) {
// if no start time is found, add an error and return
ATSPutError error = new ATSPutError();
error.setEntityId(atsEntity.getEntityId());
error.setEntityType(atsEntity.getEntityType());
error.setErrorCode(ATSPutError.NO_START_TIME);
response.addError(error);
return;
}
Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0);
Map<String, Object> primaryFilters = atsEntity.getPrimaryFilters();
// write event entries
if (events != null && !events.isEmpty()) {
for (ATSEvent event : events) {
byte[] revts = writeReverseOrderedLong(event.getTimestamp());
byte[] key = createEntityEventKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime, revts,
event.getEventType());
byte[] value = GenericObjectMapper.write(event.getEventInfo());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
}
}
// write related entity entries
Map<String,List<String>> relatedEntities =
atsEntity.getRelatedEntities();
if (relatedEntities != null && !relatedEntities.isEmpty()) {
for (Entry<String, List<String>> relatedEntityList :
relatedEntities.entrySet()) {
String relatedEntityType = relatedEntityList.getKey();
for (String relatedEntityId : relatedEntityList.getValue()) {
// look up start time of related entity
byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
relatedEntityType, null, null, writeBatch);
if (relatedEntityStartTime == null) {
// if start time is not found, set start time of the related
// entity to the start time of this entity, and write it to the
// db and the cache
relatedEntityStartTime = revStartTime;
writeBatch.put(createStartTimeLookupKey(relatedEntityId,
relatedEntityType), relatedEntityStartTime);
startTimeCache.put(new EntityIdentifier(relatedEntityId,
relatedEntityType), revStartTimeLong);
}
// write reverse entry (related entity -> entity)
byte[] key = createReleatedEntityKey(relatedEntityId,
relatedEntityType, relatedEntityStartTime,
atsEntity.getEntityId(), atsEntity.getEntityType());
writeBatch.put(key, EMPTY_BYTES);
// TODO: write forward entry (entity -> related entity)?
}
}
}
// write primary filter entries
if (primaryFilters != null && !primaryFilters.isEmpty()) {
for (Entry<String, Object> primaryFilter : primaryFilters.entrySet()) {
byte[] key = createPrimaryFilterKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime, primaryFilter.getKey());
byte[] value = GenericObjectMapper.write(primaryFilter.getValue());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
}
}
// write other info entries
Map<String, Object> otherInfo = atsEntity.getOtherInfo();
if (otherInfo != null && !otherInfo.isEmpty()) {
for (Entry<String, Object> i : otherInfo.entrySet()) {
byte[] key = createOtherInfoKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime, i.getKey());
byte[] value = GenericObjectMapper.write(i.getValue());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
}
}
db.write(writeBatch);
} catch (IOException e) {
LOG.error("Error putting entity " + atsEntity.getEntityId() +
" of type " + atsEntity.getEntityType(), e);
ATSPutError error = new ATSPutError();
error.setEntityId(atsEntity.getEntityId());
error.setEntityType(atsEntity.getEntityType());
error.setErrorCode(ATSPutError.IO_EXCEPTION);
response.addError(error);
} finally {
IOUtils.cleanup(LOG, writeBatch);
}
}
/**
* For a given key / value pair that has been written to the db,
* write additional entries to the db for each primary filter.
*/
private static void writePrimaryFilterEntries(WriteBatch writeBatch,
Map<String, Object> primaryFilters, byte[] key, byte[] value)
throws IOException {
if (primaryFilters != null && !primaryFilters.isEmpty()) {
for (Entry<String, Object> p : primaryFilters.entrySet()) {
writeBatch.put(addPrimaryFilterToKey(p.getKey(), p.getValue(),
key), value);
}
}
}
@Override
public ATSPutErrors put(ATSEntities atsEntities) {
ATSPutErrors response = new ATSPutErrors();
for (ATSEntity atsEntity : atsEntities.getEntities()) {
put(atsEntity, response);
}
return response;
}
/**
* Get the unique start time for a given entity as a byte array that sorts
* the timestamps in reverse order (see {@link
* GenericObjectMapper#writeReverseOrderedLong(long)}).
*
* @param entityId The id of the entity
* @param entityType The type of the entity
* @param startTime The start time of the entity, or null
* @param events A list of events for the entity, or null
* @param writeBatch A leveldb write batch, if the method is called by a
* put as opposed to a get
* @return A byte array
* @throws IOException
*/
private byte[] getStartTime(String entityId, String entityType,
Long startTime, List<ATSEvent> events, WriteBatch writeBatch)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
if (startTime == null) {
// start time is not provided, so try to look it up
if (startTimeCache.containsKey(entity)) {
// found the start time in the cache
startTime = startTimeCache.get(entity);
} else {
// try to look up the start time in the db
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
byte[] v = db.get(b);
if (v == null) {
// did not find the start time in the db
// if this is a put, try to set it from the provided events
if (events == null || writeBatch == null) {
// no events, or not a put, so return null
return null;
}
Long min = Long.MAX_VALUE;
for (ATSEvent e : events)
if (min > e.getTimestamp())
min = e.getTimestamp();
startTime = min;
// selected start time as minimum timestamp of provided events
// write start time to db and cache
writeBatch.put(b, writeReverseOrderedLong(startTime));
startTimeCache.put(entity, startTime);
} else {
// found the start time in the db
startTime = readReverseOrderedLong(v, 0);
if (writeBatch != null) {
// if this is a put, re-add the start time to the cache
startTimeCache.put(entity, startTime);
}
}
}
} else {
// start time is provided
// TODO: verify start time in db as well as cache?
if (startTimeCache.containsKey(entity)) {
// if the start time is already in the cache,
// and it is different from the provided start time,
// use the one from the cache
if (!startTime.equals(startTimeCache.get(entity)))
startTime = startTimeCache.get(entity);
} else if (writeBatch != null) {
// if this is a put, write the provided start time to the db and the
// cache
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
writeBatch.put(b, writeReverseOrderedLong(startTime));
startTimeCache.put(entity, startTime);
}
}
return writeReverseOrderedLong(startTime);
}
/**
* Creates a key for looking up the start time of a given entity,
* of the form START_TIME_LOOKUP_PREFIX + entitytype + entity.
*/
private static byte[] createStartTimeLookupKey(String entity,
String entitytype) throws IOException {
return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX)
.add(entitytype).add(entity).getBytes();
}
/**
* Creates an index entry for the given key of the form
* INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key.
*/
private static byte[] addPrimaryFilterToKey(String primaryFilterName,
Object primaryFilterValue, byte[] key) throws IOException {
return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
.add(primaryFilterName)
.add(GenericObjectMapper.write(primaryFilterValue), true).add(key)
.getBytes();
}
/**
* Creates an event key, serializing ENTITY_ENTRY_PREFIX + entitytype +
* revstarttime + entity + TIME_COLUMN + reveventtimestamp + eventtype.
*/
private static byte[] createEntityEventKey(String entity, String entitytype,
byte[] revStartTime, byte[] reveventtimestamp, String eventtype)
throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entitytype).add(revStartTime).add(entity).add(TIME_COLUMN)
.add(reveventtimestamp).add(eventtype).getBytes();
}
/**
* Creates an event object from the given key, offset, and value. If the
* event type is not contained in the specified set of event types,
* returns null.
*/
private static ATSEvent getEntityEvent(Set<String> eventTypes, byte[] key,
int offset, byte[] value) throws IOException {
KeyParser kp = new KeyParser(key, offset);
long ts = kp.getNextLong();
String tstype = kp.getNextString();
if (eventTypes == null || eventTypes.contains(tstype)) {
ATSEvent event = new ATSEvent();
event.setTimestamp(ts);
event.setEventType(tstype);
Object o = GenericObjectMapper.read(value);
if (o == null) {
event.setEventInfo(null);
} else if (o instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> m = (Map<String, Object>) o;
event.setEventInfo(m);
} else {
throw new IOException("Couldn't deserialize event info map");
}
return event;
}
return null;
}
/**
* Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
* entitytype + revstarttime + entity + PRIMARY_FILTER_COLUMN + name.
*/
private static byte[] createPrimaryFilterKey(String entity,
String entitytype, byte[] revStartTime, String name) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
.add(revStartTime).add(entity).add(PRIMARY_FILTER_COLUMN).add(name)
.getBytes();
}
/**
* Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entitytype +
* revstarttime + entity + OTHER_INFO_COLUMN + name.
*/
private static byte[] createOtherInfoKey(String entity, String entitytype,
byte[] revStartTime, String name) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
.add(revStartTime).add(entity).add(OTHER_INFO_COLUMN).add(name)
.getBytes();
}
/**
* Creates a string representation of the byte array from the given offset
* to the end of the array (for parsing other info keys).
*/
private static String parseRemainingKey(byte[] b, int offset) {
return new String(b, offset, b.length - offset);
}
/**
* Creates a related entity key, serializing ENTITY_ENTRY_PREFIX +
* entitytype + revstarttime + entity + RELATED_COLUMN + relatedentitytype +
* relatedentity.
*/
private static byte[] createReleatedEntityKey(String entity,
String entitytype, byte[] revStartTime, String relatedEntity,
String relatedEntityType) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
.add(revStartTime).add(entity).add(RELATED_COLUMN)
.add(relatedEntityType).add(relatedEntity).getBytes();
}
/**
* Parses the related entity from the given key at the given offset and
* adds it to the given entity.
*/
private static void addRelatedEntity(ATSEntity atsEntity, byte[] key,
int offset) throws IOException {
KeyParser kp = new KeyParser(key, offset);
String type = kp.getNextString();
String id = kp.getNextString();
atsEntity.addRelatedEntity(type, id);
}
/**
* Clears the cache to test reloading start times from leveldb (only for
* testing).
*/
@VisibleForTesting
void clearStartTimeCache() {
startTimeCache.clear();
}
}

View File

@ -53,8 +53,8 @@ import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
public class MemoryApplicationTimelineStore
extends AbstractService implements ApplicationTimelineStore {
private Map<EntityId, ATSEntity> entities =
new HashMap<EntityId, ATSEntity>();
private Map<EntityIdentifier, ATSEntity> entities =
new HashMap<EntityIdentifier, ATSEntity>();
public MemoryApplicationTimelineStore() {
super(MemoryApplicationTimelineStore.class.getName());
@ -125,7 +125,7 @@ public class MemoryApplicationTimelineStore
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.allOf(Field.class);
}
ATSEntity entity = entities.get(new EntityId(entityId, entityType));
ATSEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
if (entity == null) {
return null;
} else {
@ -152,7 +152,7 @@ public class MemoryApplicationTimelineStore
windowEnd = Long.MAX_VALUE;
}
for (String entityId : entityIds) {
EntityId entityID = new EntityId(entityId, entityType);
EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
ATSEntity entity = entities.get(entityID);
if (entity == null) {
continue;
@ -184,8 +184,8 @@ public class MemoryApplicationTimelineStore
public ATSPutErrors put(ATSEntities data) {
ATSPutErrors errors = new ATSPutErrors();
for (ATSEntity entity : data.getEntities()) {
EntityId entityId =
new EntityId(entity.getEntityId(), entity.getEntityType());
EntityIdentifier entityId =
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
// store entity info in memory
ATSEntity existingEntity = entities.get(entityId);
if (existingEntity == null) {
@ -210,7 +210,7 @@ public class MemoryApplicationTimelineStore
ATSPutError error = new ATSPutError();
error.setEntityId(entityId.getId());
error.setEntityType(entityId.getType());
error.setErrorCode(1);
error.setErrorCode(ATSPutError.NO_START_TIME);
errors.addError(error);
entities.remove(entityId);
continue;
@ -242,12 +242,20 @@ public class MemoryApplicationTimelineStore
continue;
}
for (String idStr : partRelatedEntities.getValue()) {
EntityId relatedEntityId =
new EntityId(idStr, partRelatedEntities.getKey());
EntityIdentifier relatedEntityId =
new EntityIdentifier(idStr, partRelatedEntities.getKey());
ATSEntity relatedEntity = entities.get(relatedEntityId);
if (relatedEntity != null) {
relatedEntity.addRelatedEntity(
existingEntity.getEntityType(), existingEntity.getEntityId());
} else {
relatedEntity = new ATSEntity();
relatedEntity.setEntityId(relatedEntityId.getId());
relatedEntity.setEntityType(relatedEntityId.getType());
relatedEntity.setStartTime(existingEntity.getStartTime());
relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
existingEntity.getEntityId());
entities.put(relatedEntityId, relatedEntity);
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
@ -45,6 +46,8 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
@ -64,6 +67,8 @@ import com.google.inject.Singleton;
//TODO: support XML serialization/deserialization
public class ATSWebServices {
private static final Log LOG = LogFactory.getLog(ATSWebServices.class);
private ApplicationTimelineStore store;
@Inject
@ -143,6 +148,10 @@ public class ATSWebServices {
"windowStart, windowEnd or limit is not a numeric value.");
} catch (IllegalArgumentException e) {
throw new BadRequestException("requested invalid field.");
} catch (IOException e) {
LOG.error("Error getting entities", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
}
if (entities == null) {
return new ATSEntities();
@ -171,6 +180,10 @@ public class ATSWebServices {
} catch (IllegalArgumentException e) {
throw new BadRequestException(
"requested invalid field.");
} catch (IOException e) {
LOG.error("Error getting entity", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
}
if (entity == null) {
throw new WebApplicationException(Response.Status.NOT_FOUND);
@ -206,6 +219,10 @@ public class ATSWebServices {
} catch (NumberFormatException e) {
throw new BadRequestException(
"windowStart, windowEnd or limit is not a numeric value.");
} catch (IOException e) {
LOG.error("Error getting entity timelines", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
}
if (events == null) {
return new ATSEvents();
@ -228,7 +245,13 @@ public class ATSWebServices {
if (entities == null) {
return new ATSPutErrors();
}
return store.put(entities);
try {
return store.put(entities);
} catch (IOException e) {
LOG.error("Error putting entities", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
}
}
private void init(HttpServletResponse response) {
@ -275,7 +298,17 @@ public class ATSWebServices {
String[] strs = str.split(delimiter);
List<Field> fieldList = new ArrayList<Field>();
for (String s : strs) {
fieldList.add(Field.valueOf(s.toUpperCase()));
s = s.trim().toUpperCase();
if (s.equals("EVENTS"))
fieldList.add(Field.EVENTS);
else if (s.equals("LASTEVENTONLY"))
fieldList.add(Field.LAST_EVENT_ONLY);
else if (s.equals("RELATEDENTITIES"))
fieldList.add(Field.RELATED_ENTITIES);
else if (s.equals("PRIMARYFILTERS"))
fieldList.add(Field.PRIMARY_FILTERS);
else if (s.equals("OTHERINFO"))
fieldList.add(Field.OTHER_INFO);
}
if (fieldList.size() == 0)
return null;

View File

@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -71,7 +73,7 @@ public class ApplicationTimelineStoreTestUtils {
/**
* Load test data into the given store
*/
protected void loadTestData() {
protected void loadTestData() throws IOException {
ATSEntities atsEntities = new ATSEntities();
Map<String, Object> primaryFilters = new HashMap<String, Object>();
primaryFilters.put("user", "username");
@ -126,7 +128,7 @@ public class ApplicationTimelineStoreTestUtils {
response = store.put(atsEntities);
assertEquals(0, response.getErrors().size());
atsEntities.setEntities(Collections.singletonList(createEntity(entity1b,
entityType1, 123l, Collections.singletonList(ev2), null,
entityType1, 789l, Collections.singletonList(ev2), null,
primaryFilters, otherInfo2)));
response = store.put(atsEntities);
assertEquals(0, response.getErrors().size());
@ -138,11 +140,11 @@ public class ApplicationTimelineStoreTestUtils {
ATSPutError error = response.getErrors().get(0);
assertEquals("badentityid", error.getEntityId());
assertEquals("badentity", error.getEntityType());
assertEquals((Integer) 1, error.getErrorCode());
assertEquals(ATSPutError.NO_START_TIME, error.getErrorCode());
}
/**
* Load veification data
* Load verification data
*/
protected void loadVerificationData() throws Exception {
userFilter = new NameValuePair("user",
@ -197,7 +199,7 @@ public class ApplicationTimelineStoreTestUtils {
events2.add(ev4);
}
public void testGetSingleEntity() {
public void testGetSingleEntity() throws IOException {
// test getting entity info
verifyEntityInfo(null, null, null, null, null, null,
store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class)));
@ -222,6 +224,10 @@ public class ApplicationTimelineStoreTestUtils {
null, null, null, store.getEntity(entity1, entityType1,
EnumSet.of(Field.LAST_EVENT_ONLY)));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, store.getEntity(entity1b, entityType1,
null));
verifyEntityInfo(entity1, entityType1, null, null, primaryFilters, null,
store.getEntity(entity1, entityType1,
EnumSet.of(Field.PRIMARY_FILTERS)));
@ -234,7 +240,7 @@ public class ApplicationTimelineStoreTestUtils {
EnumSet.of(Field.RELATED_ENTITIES)));
}
public void testGetEntities() {
public void testGetEntities() throws IOException {
// test getting entities
assertEquals("nonzero entities size for nonexistent type", 0,
store.getEntities("type_0", null, null, null, null, null,
@ -305,7 +311,7 @@ public class ApplicationTimelineStoreTestUtils {
primaryFilters, otherInfo, entities.get(1));
}
public void testGetEntitiesWithPrimaryFilters() {
public void testGetEntitiesWithPrimaryFilters() throws IOException {
// test using primary filter
assertEquals("nonzero entities size for primary filter", 0,
store.getEntities("type_1", null, null, null,
@ -361,7 +367,7 @@ public class ApplicationTimelineStoreTestUtils {
primaryFilters, otherInfo, entities.get(1));
}
public void testGetEntitiesWithSecondaryFilters() {
public void testGetEntitiesWithSecondaryFilters() throws IOException {
// test using secondary filter
List<ATSEntity> entities = store.getEntities("type_1", null, null, null,
null, goodTestingFilters, EnumSet.allOf(Field.class)).getEntities();
@ -388,7 +394,7 @@ public class ApplicationTimelineStoreTestUtils {
assertEquals(0, entities.size());
}
public void testGetEvents() {
public void testGetEvents() throws IOException {
// test getting entity timelines
SortedSet<String> sortedSet = new TreeSet<String>();
sortedSet.add(entity1);

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableComparator;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TestGenericObjectMapper {
@Test
public void testEncoding() {
testEncoding(Long.MAX_VALUE);
testEncoding(Long.MIN_VALUE);
testEncoding(0l);
testEncoding(128l);
testEncoding(256l);
testEncoding(512l);
testEncoding(-256l);
}
private static void testEncoding(long l) {
byte[] b = GenericObjectMapper.writeReverseOrderedLong(l);
assertEquals("error decoding", l,
GenericObjectMapper.readReverseOrderedLong(b, 0));
byte[] buf = new byte[16];
System.arraycopy(b, 0, buf, 5, 8);
assertEquals("error decoding at offset", l,
GenericObjectMapper.readReverseOrderedLong(buf, 5));
if (l > Long.MIN_VALUE) {
byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1);
assertEquals("error preserving ordering", 1,
WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length));
}
if (l < Long.MAX_VALUE) {
byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1);
assertEquals("error preserving ordering", 1,
WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length));
}
}
private static void verify(Object o) throws IOException {
assertEquals(o, GenericObjectMapper.read(GenericObjectMapper.write(o)));
}
@Test
public void testValueTypes() throws IOException {
verify(42l);
verify(42);
verify(1.23);
verify("abc");
verify(true);
List<String> list = new ArrayList<String>();
list.add("123");
list.add("abc");
verify(list);
Map<String,String> map = new HashMap<String,String>();
map.put("k1","v1");
map.put("k2","v2");
verify(map);
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TestLeveldbApplicationTimelineStore
extends ApplicationTimelineStoreTestUtils {
private FileContext fsContext;
private File fsPath;
@Before
public void setup() throws Exception {
fsContext = FileContext.getLocalFSFileContext();
Configuration conf = new Configuration();
fsPath = new File("target", this.getClass().getSimpleName() +
"-tmpDir").getAbsoluteFile();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
conf.set(YarnConfiguration.ATS_LEVELDB_PATH_PROPERTY,
fsPath.getAbsolutePath());
store = new LeveldbApplicationTimelineStore();
store.init(conf);
store.start();
loadTestData();
loadVerificationData();
}
@After
public void tearDown() throws Exception {
store.stop();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
}
@Test
public void testGetSingleEntity() throws IOException {
super.testGetSingleEntity();
((LeveldbApplicationTimelineStore)store).clearStartTimeCache();
super.testGetSingleEntity();
}
@Test
public void testGetEntities() throws IOException {
super.testGetEntities();
}
@Test
public void testGetEntitiesWithPrimaryFilters() throws IOException {
super.testGetEntitiesWithPrimaryFilters();
}
@Test
public void testGetEntitiesWithSecondaryFilters() throws IOException {
super.testGetEntitiesWithSecondaryFilters();
}
@Test
public void testGetEvents() throws IOException {
super.testGetEvents();
}
}

View File

@ -23,6 +23,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class TestMemoryApplicationTimelineStore
extends ApplicationTimelineStoreTestUtils {
@ -46,27 +47,27 @@ public class TestMemoryApplicationTimelineStore
}
@Test
public void testGetSingleEntity() {
public void testGetSingleEntity() throws IOException {
super.testGetSingleEntity();
}
@Test
public void testGetEntities() {
public void testGetEntities() throws IOException {
super.testGetEntities();
}
@Test
public void testGetEntitiesWithPrimaryFilters() {
public void testGetEntitiesWithPrimaryFilters() throws IOException {
super.testGetEntitiesWithPrimaryFilters();
}
@Test
public void testGetEntitiesWithSecondaryFilters() {
public void testGetEntitiesWithSecondaryFilters() throws IOException {
super.testGetEntitiesWithSecondaryFilters();
}
@Test
public void testGetEvents() {
public void testGetEvents() throws IOException {
super.testGetEvents();
}

View File

@ -156,6 +156,43 @@ public class TestATSWebServices extends JerseyTest {
Assert.assertEquals(4, entity.getOtherInfo().size());
}
@Test
public void testGetEntityFields1() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
.path("type_1").path("id_1").queryParam("fields", "events,otherinfo")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSEntity entity = response.getEntity(ATSEntity.class);
Assert.assertNotNull(entity);
Assert.assertEquals("id_1", entity.getEntityId());
Assert.assertEquals("type_1", entity.getEntityType());
Assert.assertEquals(123l, entity.getStartTime().longValue());
Assert.assertEquals(2, entity.getEvents().size());
Assert.assertEquals(0, entity.getPrimaryFilters().size());
Assert.assertEquals(4, entity.getOtherInfo().size());
}
@Test
public void testGetEntityFields2() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
.path("type_1").path("id_1").queryParam("fields", "lasteventonly," +
"primaryfilters,relatedentities")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSEntity entity = response.getEntity(ATSEntity.class);
Assert.assertNotNull(entity);
Assert.assertEquals("id_1", entity.getEntityId());
Assert.assertEquals("type_1", entity.getEntityType());
Assert.assertEquals(123l, entity.getStartTime().longValue());
Assert.assertEquals(1, entity.getEvents().size());
Assert.assertEquals(2, entity.getPrimaryFilters().size());
Assert.assertEquals(0, entity.getOtherInfo().size());
}
@Test
public void testGetEvents() throws Exception {
WebResource r = resource();

View File

@ -73,7 +73,8 @@ public class NonAggregatingLogHandler extends AbstractService implements
protected void serviceInit(Configuration conf) throws Exception {
// Default 3 hours.
this.deleteDelaySeconds =
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
sched = createScheduledThreadPoolExecutor(conf);
super.serviceInit(conf);
}

View File

@ -145,7 +145,8 @@ public class TestNonAggregatingLogHandler {
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
DrainDispatcher dispatcher = createDispatcher(conf);
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Before;
import org.junit.Test;
@ -119,6 +120,7 @@ public class TestRMHA {
*/
@Test (timeout = 30000)
public void testStartAndTransitions() throws IOException {
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
Configuration conf = new YarnConfiguration(configuration);
rm = new MockRM(conf);
rm.init(conf);
@ -178,7 +180,6 @@ public class TestRMHA {
"automatic failover is enabled";
Configuration conf = new YarnConfiguration(configuration);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
rm = new MockRM(conf);
rm.init(conf);
@ -236,6 +237,7 @@ public class TestRMHA {
String errorMessageForEventHandler =
"Expect to get the same number of handlers";
String errorMessageForService = "Expect to get the same number of services";
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
Configuration conf = new YarnConfiguration(configuration);
rm = new MockRM(conf) {
@Override
@ -313,6 +315,57 @@ public class TestRMHA {
}
}
@Test
public void testHAWithRMHostName() {
//test if both RM_HOSTBANE_{rm_id} and RM_RPCADDRESS_{rm_id} are set
//We should only read rpc addresses from RM_RPCADDRESS_{rm_id} configuration
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
RM1_NODE_ID), "1.1.1.1");
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
RM2_NODE_ID), "0.0.0.0");
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
RM3_NODE_ID), "2.2.2.2");
try {
Configuration conf = new YarnConfiguration(configuration);
rm = new MockRM(conf);
rm.init(conf);
for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
assertEquals("RPC address not set for " + confKey,
RM1_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM1_NODE_ID)));
assertEquals("RPC address not set for " + confKey,
RM2_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM2_NODE_ID)));
assertEquals("RPC address not set for " + confKey,
RM3_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM3_NODE_ID)));
}
} catch (YarnRuntimeException e) {
fail("Should not throw any exceptions.");
}
//test if only RM_HOSTBANE_{rm_id} is set
configuration.clear();
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + ","
+ RM2_NODE_ID);
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
RM1_NODE_ID), "1.1.1.1");
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
RM2_NODE_ID), "0.0.0.0");
try {
Configuration conf = new YarnConfiguration(configuration);
rm = new MockRM(conf);
rm.init(conf);
assertEquals("RPC address not set for " + YarnConfiguration.RM_ADDRESS,
"1.1.1.1:8032",
conf.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID)));
assertEquals("RPC address not set for " + YarnConfiguration.RM_ADDRESS,
"0.0.0.0:8032",
conf.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID)));
} catch (YarnRuntimeException e) {
fail("Should not throw any exceptions.");
}
}
@SuppressWarnings("rawtypes")
class MyCountingDispatcher extends AbstractService implements Dispatcher {

View File

@ -159,6 +159,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
Configuration conf1 = createHARMConf("rm1,rm2", "rm1", 1234);
conf1.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
ResourceManager rm1 = new ResourceManager();
rm1.init(conf1);
rm1.start();
@ -170,6 +171,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678);
conf2.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
ResourceManager rm2 = new ResourceManager();
rm2.init(conf2);
rm2.start();

View File

@ -42,7 +42,7 @@ public class TestMiniYARNClusterForHA {
@Before
public void setup() throws IOException, InterruptedException {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster = new MiniYARNCluster(TestMiniYARNClusterForHA.class.getName(),
2, 1, 1, 1);
cluster.init(conf);