YARN-2005. Blacklisting support for scheduling AMs. (Anubhav Dhoot via kasha)

This commit is contained in:
Karthik Kambatla 2015-09-13 17:03:15 -07:00
parent 7269906254
commit 81df7b586a
23 changed files with 741 additions and 112 deletions

View File

@ -430,6 +430,8 @@ Release 2.8.0 - UNRELEASED
YARN-4145. Make RMHATestBase abstract so its not run when running all
tests under that namespace (adhoot via rkanter)
YARN-2005. Blacklisting support for scheduling AMs. (Anubhav Dhoot via kasha)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -2025,6 +2025,15 @@ public class YarnConfiguration extends Configuration {
public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
public static final String AM_BLACKLISTING_ENABLED =
YARN_PREFIX + "am.blacklisting.enabled";
public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true;
public static final String AM_BLACKLISTING_DISABLE_THRESHOLD =
YARN_PREFIX + "am.blacklisting.disable-failure-threshold";
public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f;
public YarnConfiguration() {
super();
}

View File

@ -2293,4 +2293,22 @@
<value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
</property>
<property>
<description>
Enable/disable blacklisting of hosts for AM based on AM failures on those
hosts.
</description>
<name>yarn.am.blacklisting.enabled</name>
<value>true</value>
</property>
<property>
<description>
Threshold of ratio number of NodeManager hosts that are allowed to be
blacklisted for AM. Beyond this ratio there is no blacklisting to avoid
danger of blacklisting the entire cluster.
</description>
<name>yarn.am.blacklisting.disable-failure-threshold</name>
<value>0.8f</value>
</property>
</configuration>

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
import org.apache.hadoop.classification.InterfaceAudience.Private;
/**
* Tracks blacklists based on failures reported on nodes.
*/
@Private
public interface BlacklistManager {
/**
* Report failure of a container on node.
* @param node that has a container failure
*/
void addNode(String node);
/**
* Get {@link BlacklistUpdates} that indicate which nodes should be
* added or to removed from the blacklist.
* @return {@link BlacklistUpdates}
*/
BlacklistUpdates getBlacklistUpdates();
/**
* Refresh the number of nodemanager hosts available for scheduling.
* @param nodeHostCount is the number of node hosts.
*/
void refreshNodeHostCount(int nodeHostCount);
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import java.util.List;
/**
* Class to track blacklist additions and removals.
*/
@Private
public class BlacklistUpdates {
private List<String> additions;
private List<String> removals;
public BlacklistUpdates(List<String> additions,
List<String> removals) {
this.additions = additions;
this.removals = removals;
}
public List<String> getAdditions() {
return additions;
}
public List<String> getRemovals() {
return removals;
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
import java.util.ArrayList;
/**
* A {@link BlacklistManager} that returns no blacklists.
*/
public class DisabledBlacklistManager implements BlacklistManager{
private static final ArrayList<String> EMPTY_LIST = new ArrayList<String>();
private BlacklistUpdates noBlacklist =
new BlacklistUpdates(EMPTY_LIST, EMPTY_LIST);
@Override
public void addNode(String node) {
}
@Override
public BlacklistUpdates getBlacklistUpdates() {
return noBlacklist;
}
@Override
public void refreshNodeHostCount(int nodeHostCount) {
// Do nothing
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Maintains a list of failed nodes and returns that as long as number of
* blacklisted nodes is below a threshold percentage of total nodes. If more
* than threshold number of nodes are marked as failure they all are returned
* as removal from blacklist so previous additions are reversed.
*/
public class SimpleBlacklistManager implements BlacklistManager {
private int numberOfNodeManagerHosts;
private final double blacklistDisableFailureThreshold;
private final Set<String> blacklistNodes = new HashSet<>();
private static final ArrayList<String> EMPTY_LIST = new ArrayList<>();
private static final Log LOG = LogFactory.getLog(SimpleBlacklistManager.class);
public SimpleBlacklistManager(int numberOfNodeManagerHosts,
double blacklistDisableFailureThreshold) {
this.numberOfNodeManagerHosts = numberOfNodeManagerHosts;
this.blacklistDisableFailureThreshold = blacklistDisableFailureThreshold;
}
@Override
public void addNode(String node) {
blacklistNodes.add(node);
}
@Override
public void refreshNodeHostCount(int nodeHostCount) {
this.numberOfNodeManagerHosts = nodeHostCount;
}
@Override
public BlacklistUpdates getBlacklistUpdates() {
BlacklistUpdates ret;
List<String> blacklist = new ArrayList<>(blacklistNodes);
final int currentBlacklistSize = blacklist.size();
final double failureThreshold = this.blacklistDisableFailureThreshold *
numberOfNodeManagerHosts;
if (currentBlacklistSize < failureThreshold) {
if (LOG.isDebugEnabled()) {
LOG.debug("blacklist size " + currentBlacklistSize + " is less than " +
"failure threshold ratio " + blacklistDisableFailureThreshold +
" out of total usable nodes " + numberOfNodeManagerHosts);
}
ret = new BlacklistUpdates(blacklist, EMPTY_LIST);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blacklist size " + currentBlacklistSize + " is more than " +
"failure threshold ratio " + blacklistDisableFailureThreshold +
" out of total usable nodes " + numberOfNodeManagerHosts);
}
ret = new BlacklistUpdates(EMPTY_LIST, blacklist);
}
return ret;
}
}

View File

@ -74,6 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@ -133,6 +136,8 @@ public class RMAppImpl implements RMApp, Recoverable {
private final Set<String> applicationTags;
private final long attemptFailuresValidityInterval;
private final boolean amBlacklistingEnabled;
private final float blacklistDisableThreshold;
private Clock systemClock;
@ -456,6 +461,18 @@ public class RMAppImpl implements RMApp, Recoverable {
maxLogAggregationDiagnosticsInMemory = conf.getInt(
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
amBlacklistingEnabled = conf.getBoolean(
YarnConfiguration.AM_BLACKLISTING_ENABLED,
YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED);
if (amBlacklistingEnabled) {
blacklistDisableThreshold = conf.getFloat(
YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
} else {
blacklistDisableThreshold = 0.0f;
}
}
@Override
@ -797,6 +814,18 @@ public class RMAppImpl implements RMApp, Recoverable {
private void createNewAttempt() {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
BlacklistManager currentAMBlacklist;
if (currentAttempt != null) {
currentAMBlacklist = currentAttempt.getAMBlacklist();
} else {
if (amBlacklistingEnabled) {
currentAMBlacklist = new SimpleBlacklistManager(
scheduler.getNumClusterNodes(), blacklistDisableThreshold);
} else {
currentAMBlacklist = new DisabledBlacklistManager();
}
}
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf,
@ -804,7 +833,8 @@ public class RMAppImpl implements RMApp, Recoverable {
// previously failed attempts(which should not include Preempted,
// hardware error and NM resync) + 1) equal to the max-attempt
// limit.
maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq,
currentAMBlacklist);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/**
@ -184,6 +185,12 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
*/
ApplicationResourceUsageReport getApplicationResourceUsageReport();
/**
* Get the {@link BlacklistManager} that manages blacklists for AM failures
* @return the {@link BlacklistManager} that tracks AM failures.
*/
BlacklistManager getAMBlacklist();
/**
* the start time of the application.
* @return the start time of the application.

View File

@ -36,7 +36,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.crypto.SecretKey;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -71,6 +70,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -182,6 +184,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private RMAppAttemptMetrics attemptMetrics = null;
private ResourceRequest amReq = null;
private BlacklistManager blacklistedNodesForAM = null;
private static final StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
@ -434,6 +437,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext,
Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq) {
this(appAttemptId, rmContext, scheduler, masterService, submissionContext,
conf, maybeLastAttempt, amReq, new DisabledBlacklistManager());
}
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext,
Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq,
BlacklistManager amBlacklist) {
this.conf = conf;
this.applicationAttemptId = appAttemptId;
this.rmContext = rmContext;
@ -454,6 +467,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
new RMAppAttemptMetrics(applicationAttemptId, rmContext);
this.amReq = amReq;
this.blacklistedNodesForAM = amBlacklist;
}
@Override
@ -940,11 +954,24 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);
appAttempt.getAMBlacklist().refreshNodeHostCount(
appAttempt.scheduler.getNumClusterNodes());
BlacklistUpdates amBlacklist = appAttempt.getAMBlacklist()
.getBlacklistUpdates();
if (LOG.isDebugEnabled()) {
LOG.debug("Using blacklist for AM: additions(" +
amBlacklist.getAdditions() + ") and removals(" +
amBlacklist.getRemovals() + ")");
}
// AM resource has been checked when submission
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId,
Collections.singletonList(appAttempt.amReq),
EMPTY_CONTAINER_RELEASE_LIST, null, null);
EMPTY_CONTAINER_RELEASE_LIST,
amBlacklist.getAdditions(),
amBlacklist.getRemovals());
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
@ -1331,6 +1358,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
}
private boolean shouldCountTowardsNodeBlacklisting(int exitStatus) {
return exitStatus == ContainerExitStatus.DISKS_FAILED;
}
private static final class UnmanagedAMAttemptSavedTransition
extends AMLaunchedTransition {
@Override
@ -1694,6 +1725,14 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
NodeId nodeId = containerFinishedEvent.getNodeId();
if (containerFinishedEvent.getContainerStatus() != null) {
if (shouldCountTowardsNodeBlacklisting(containerFinishedEvent
.getContainerStatus().getExitStatus())) {
appAttempt.addAMNodeToBlackList(containerFinishedEvent.getNodeId());
}
} else {
LOG.warn("No ContainerStatus in containerFinishedEvent");
}
finishedContainersSentToAM.putIfAbsent(nodeId,
new ArrayList<ContainerStatus>());
appAttempt.finishedContainersSentToAM.get(nodeId).add(
@ -1708,6 +1747,15 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
}
private void addAMNodeToBlackList(NodeId nodeId) {
blacklistedNodesForAM.addNode(nodeId.getHost().toString());
}
@Override
public BlacklistManager getAMBlacklist() {
return blacklistedNodesForAM;
}
private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent

View File

@ -65,7 +65,8 @@ public class AppSchedulingInfo {
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
final Map<Priority, Map<String, ResourceRequest>> requests =
new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
private Set<String> blacklist = new HashSet<String>();
private Set<String> userBlacklist = new HashSet<>();
private Set<String> amBlacklist = new HashSet<>();
//private final ApplicationStore store;
private ActiveUsersManager activeUsersManager;
@ -217,23 +218,41 @@ public class AppSchedulingInfo {
}
/**
* The ApplicationMaster is updating the blacklist
* The ApplicationMaster is updating the userBlacklist used for containers
* other than AMs.
*
* @param blacklistAdditions resources to be added to the blacklist
* @param blacklistRemovals resources to be removed from the blacklist
* @param blacklistAdditions resources to be added to the userBlacklist
* @param blacklistRemovals resources to be removed from the userBlacklist
*/
synchronized public void updateBlacklist(
public void updateBlacklist(
List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Add to blacklist
updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
blacklistRemovals);
}
/**
* RM is updating blacklist for AM containers.
* @param blacklistAdditions resources to be added to the amBlacklist
* @param blacklistRemovals resources to be added to the amBlacklist
*/
public void updateAMBlacklist(
List<String> blacklistAdditions, List<String> blacklistRemovals) {
updateUserOrAMBlacklist(amBlacklist, blacklistAdditions,
blacklistRemovals);
}
void updateUserOrAMBlacklist(Set<String> blacklist,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
synchronized (blacklist) {
if (blacklistAdditions != null) {
blacklist.addAll(blacklistAdditions);
}
// Remove from blacklist
if (blacklistRemovals != null) {
blacklist.removeAll(blacklistRemovals);
}
}
}
synchronized public Collection<Priority> getPriorities() {
return priorities;
@ -263,8 +282,23 @@ public class AppSchedulingInfo {
return (request == null) ? null : request.getCapability();
}
public synchronized boolean isBlacklisted(String resourceName) {
return blacklist.contains(resourceName);
/**
* Returns if the node is either blacklisted by the user or the system
* @param resourceName the resourcename
* @param useAMBlacklist true if it should check amBlacklist
* @return true if its blacklisted
*/
public boolean isBlacklisted(String resourceName,
boolean useAMBlacklist) {
if (useAMBlacklist){
synchronized (amBlacklist) {
return amBlacklist.contains(resourceName);
}
} else {
synchronized (userBlacklist) {
return userBlacklist.contains(resourceName);
}
}
}
/**
@ -473,19 +507,25 @@ public class AppSchedulingInfo {
this.queue = queue;
}
public synchronized Set<String> getBlackList() {
return this.blacklist;
public Set<String> getBlackList() {
return this.userBlacklist;
}
public synchronized Set<String> getBlackListCopy() {
return new HashSet<>(this.blacklist);
public Set<String> getBlackListCopy() {
synchronized (userBlacklist) {
return new HashSet<>(this.userBlacklist);
}
}
public synchronized void transferStateFromPreviousAppSchedulingInfo(
AppSchedulingInfo appInfo) {
// this.priorities = appInfo.getPriorities();
// this.requests = appInfo.getRequests();
this.blacklist = appInfo.getBlackList();
// This should not require locking the userBlacklist since it will not be
// used by this instance until after setCurrentAppAttempt.
// Should cleanup this to avoid sharing between instances and can
// then remove getBlacklist as well.
this.userBlacklist = appInfo.getBlackList();
}
public synchronized void recoverContainer(RMContainer rmContainer) {

View File

@ -470,16 +470,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
RMContainer rmContainer = i.next();
Container container = rmContainer.getContainer();
ContainerType containerType = ContainerType.TASK;
// The working knowledge is that masterContainer for AM is null as it
// itself is the master container.
RMAppAttempt appAttempt =
rmContext
.getRMApps()
.get(
container.getId().getApplicationAttemptId()
.getApplicationId()).getCurrentAppAttempt();
if (appAttempt.getMasterContainer() == null
&& appAttempt.getSubmissionContext().getUnmanagedAM() == false) {
boolean isWaitingForAMContainer = isWaitingForAMContainer(
container.getId().getApplicationAttemptId().getApplicationId());
if (isWaitingForAMContainer) {
containerType = ContainerType.APPLICATION_MASTER;
}
try {
@ -509,6 +502,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
}
public boolean isWaitingForAMContainer(ApplicationId applicationId) {
// The working knowledge is that masterContainer for AM is null as it
// itself is the master container.
RMAppAttempt appAttempt =
rmContext.getRMApps().get(applicationId).getCurrentAppAttempt();
return (appAttempt != null && appAttempt.getMasterContainer() == null
&& appAttempt.getSubmissionContext().getUnmanagedAM() == false);
}
// Blacklist used for user containers
public synchronized void updateBlacklist(
List<String> blacklistAdditions, List<String> blacklistRemovals) {
if (!isStopped) {
@ -517,8 +520,18 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
}
// Blacklist used for AM containers
public synchronized void updateAMBlacklist(
List<String> blacklistAdditions, List<String> blacklistRemovals) {
if (!isStopped) {
this.appSchedulingInfo.updateAMBlacklist(
blacklistAdditions, blacklistRemovals);
}
}
public boolean isBlacklisted(String resourceName) {
return this.appSchedulingInfo.isBlacklisted(resourceName);
boolean useAMBlacklist = isWaitingForAMContainer(getApplicationId());
return this.appSchedulingInfo.isBlacklisted(resourceName, useAMBlacklist);
}
public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@ -934,7 +933,13 @@ public class CapacityScheduler extends
}
}
if (application.isWaitingForAMContainer(application.getApplicationId())) {
// Allocate is for AM and update AM blacklist for this
application.updateAMBlacklist(
blacklistAdditions, blacklistRemovals);
} else {
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
}
allocation = application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import org.apache.commons.logging.Log;
public class FiCaSchedulerUtils {
public static boolean isBlacklisted(FiCaSchedulerApp application,
FiCaSchedulerNode node, Log LOG) {
if (application.isBlacklisted(node.getNodeName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'host' " + node.getNodeName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}
return true;
}
if (application.isBlacklisted(node.getRackName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'rack' " + node.getRackName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}
return true;
}
return false;
}
}

View File

@ -955,7 +955,14 @@ public class FairScheduler extends
preemptionContainerIds.add(container.getContainerId());
}
if (application.isWaitingForAMContainer(application.getApplicationId())) {
// Allocate is for AM and update AM blacklist for this
application.updateAMBlacklist(
blacklistAdditions, blacklistRemovals);
} else {
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
}
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();

View File

@ -356,7 +356,14 @@ public class FifoScheduler extends
" #ask=" + ask.size());
}
if (application.isWaitingForAMContainer(application.getApplicationId())) {
// Allocate is for AM and update AM blacklist for this
application.updateAMBlacklist(
blacklistAdditions, blacklistRemovals);
} else {
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
}
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
Resource headroom = application.getHeadroom();

View File

@ -750,10 +750,7 @@ public class MockRM extends ResourceManager {
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt();
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
System.out.println("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
@ -761,6 +758,15 @@ public class MockRM extends ResourceManager {
return am;
}
public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm)
throws Exception {
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt();
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
return attempt;
}
public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
MockAM am = launchAM(app, rm, nm);

View File

@ -35,8 +35,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -49,11 +53,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
@ -82,21 +89,7 @@ public class TestAMRestart {
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 3;
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm1.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
allocateContainers(nm1, am1, NUM_CONTAINERS);
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
@ -244,6 +237,29 @@ public class TestAMRestart {
rm1.stop();
}
private List<Container> allocateContainers(MockNM nm1, MockAM am1,
int NUM_CONTAINERS) throws Exception {
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm1.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
Assert.assertEquals("Did not get all containers allocated",
NUM_CONTAINERS, containers.size());
return containers;
}
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
throws InterruptedException {
int count = 0;
@ -258,6 +274,9 @@ public class TestAMRestart {
public void testNMTokensRebindOnAMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
// To prevent test from blacklisting nm1 for AM, we sit threshold to half
// of 2 nodes which is 1
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 0.5f);
MockRM rm1 = new MockRM(conf);
rm1.start();
@ -355,6 +374,106 @@ public class TestAMRestart {
rm1.stop();
}
@Test(timeout = 100000)
public void testAMBlacklistPreventsRestartOnSameNode() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = new MockRM(conf, memStore) {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);
}
};
}
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 =
new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService());
nm2.registerNode();
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
CapacityScheduler scheduler =
(CapacityScheduler) rm1.getResourceScheduler();
ContainerId amContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Preempt the first attempt;
RMContainer rmContainer = scheduler.getRMContainer(amContainer);
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
MockNM currentNode, otherNode;
if (nodeWhereAMRan == nm1.getNodeId()) {
currentNode = nm1;
otherNode = nm2;
} else {
currentNode = nm2;
otherNode = nm1;
}
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
"", ContainerExitStatus.DISKS_FAILED);
currentNode.containerStatus(containerStatus);
am1.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// restart the am
RMAppAttempt attempt = rm1.waitForAttemptScheduled(app1, rm1);
System.out.println("Launch AM " + attempt.getAppAttemptId());
currentNode.nodeHeartbeat(true);
dispatcher.await();
Assert.assertEquals(
"AppAttemptState should still be SCHEDULED if currentNode is " +
"blacklisted correctly",
RMAppAttemptState.SCHEDULED,
attempt.getAppAttemptState());
otherNode.nodeHeartbeat(true);
dispatcher.await();
MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId());
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
amContainer =
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
rmContainer = scheduler.getRMContainer(amContainer);
nodeWhereAMRan = rmContainer.getAllocatedNode();
Assert.assertEquals(
"After blacklisting AM should have run on the other node",
otherNode.getNodeId(), nodeWhereAMRan);
am2.registerAppAttempt();
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
List<Container> allocatedContainers =
allocateContainers(currentNode, am2, 1);
Assert.assertEquals(
"Even though AM is blacklisted from the node, application can still " +
"allocate containers there",
currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
}
// AM container preempted, nm disk failure
// should not be counted towards AM max retry count.
@Test(timeout = 100000)

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
public class TestBlacklistManager {
@Test
public void testSimpleBlacklistBelowFailureThreshold() {
final int numberOfNodeManagerHosts = 3;
final double blacklistDisableFailureThreshold = 0.8;
BlacklistManager manager = new SimpleBlacklistManager(
numberOfNodeManagerHosts, blacklistDisableFailureThreshold);
String anyNode = "foo";
String anyNode2 = "bar";
manager.addNode(anyNode);
manager.addNode(anyNode2);
BlacklistUpdates blacklist = manager
.getBlacklistUpdates();
List<String> blacklistAdditions = blacklist.getAdditions();
Collections.sort(blacklistAdditions);
List<String> blacklistRemovals = blacklist.getRemovals();
String[] expectedBlacklistAdditions = new String[]{anyNode2, anyNode};
Assert.assertArrayEquals(
"Blacklist additions was not as expected",
expectedBlacklistAdditions,
blacklistAdditions.toArray());
Assert.assertTrue(
"Blacklist removals should be empty but was " +
blacklistRemovals,
blacklistRemovals.isEmpty());
}
@Test
public void testSimpleBlacklistAboveFailureThreshold() {
// Create a threshold of 0.5 * 3 i.e at 1.5 node failures.
BlacklistManager manager = new SimpleBlacklistManager(3, 0.5);
String anyNode = "foo";
String anyNode2 = "bar";
manager.addNode(anyNode);
BlacklistUpdates blacklist = manager
.getBlacklistUpdates();
List<String> blacklistAdditions = blacklist.getAdditions();
Collections.sort(blacklistAdditions);
List<String> blacklistRemovals = blacklist.getRemovals();
String[] expectedBlacklistAdditions = new String[]{anyNode};
Assert.assertArrayEquals(
"Blacklist additions was not as expected",
expectedBlacklistAdditions,
blacklistAdditions.toArray());
Assert.assertTrue(
"Blacklist removals should be empty but was " +
blacklistRemovals,
blacklistRemovals.isEmpty());
manager.addNode(anyNode2);
blacklist = manager
.getBlacklistUpdates();
blacklistAdditions = blacklist.getAdditions();
Collections.sort(blacklistAdditions);
blacklistRemovals = blacklist.getRemovals();
Collections.sort(blacklistRemovals);
String[] expectedBlacklistRemovals = new String[] {anyNode2, anyNode};
Assert.assertTrue(
"Blacklist additions should be empty but was " +
blacklistAdditions,
blacklistAdditions.isEmpty());
Assert.assertArrayEquals(
"Blacklist removals was not as expected",
expectedBlacklistRemovals,
blacklistRemovals.toArray());
}
@Test
public void testDisabledBlacklist() {
BlacklistManager disabled = new DisabledBlacklistManager();
String anyNode = "foo";
disabled.addNode(anyNode);
BlacklistUpdates blacklist = disabled
.getBlacklistUpdates();
List<String> blacklistAdditions = blacklist.getAdditions();
List<String> blacklistRemovals = blacklist.getRemovals();
Assert.assertTrue(
"Blacklist additions should be empty but was " +
blacklistAdditions,
blacklistAdditions.isEmpty());
Assert.assertTrue(
"Blacklist removals should be empty but was " +
blacklistRemovals,
blacklistRemovals.isEmpty());
}
}

View File

@ -489,7 +489,7 @@ public class TestRMAppLogAggregationStatus {
2, Resource.newInstance(10, 2), "test");
return new RMAppImpl(this.appId, this.rmContext,
conf, "test", "test", "default", submissionContext,
this.rmContext.getScheduler(),
scheduler,
this.rmContext.getApplicationMasterService(),
System.currentTimeMillis(), "test",
null, null);

View File

@ -970,7 +970,7 @@ public class TestRMAppTransitions {
appState.getApplicationSubmissionContext().getApplicationId(),
rmContext, conf,
submissionContext.getApplicationName(), null,
submissionContext.getQueue(), submissionContext, null, null,
submissionContext.getQueue(), submissionContext, scheduler, null,
appState.getSubmitTime(), submissionContext.getApplicationType(),
submissionContext.getApplicationTags(),
BuilderUtils.newResourceRequest(

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -655,6 +656,11 @@ public class TestCapacityScheduler {
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
@ -715,6 +721,11 @@ public class TestCapacityScheduler {
RMAppImpl app1 = mock(RMAppImpl.class);
when(app1.getApplicationId()).thenReturn(appId1);
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt1.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
@ -739,6 +750,8 @@ public class TestCapacityScheduler {
RMAppImpl app2 = mock(RMAppImpl.class);
when(app2.getApplicationId()).thenReturn(appId2);
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
when(attempt2.getMasterContainer()).thenReturn(container);
when(attempt2.getSubmissionContext()).thenReturn(submissionContext);
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
@ -2876,6 +2889,11 @@ public class TestCapacityScheduler {
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
@ -2953,6 +2971,11 @@ public class TestCapacityScheduler {
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
@ -2976,6 +2999,8 @@ public class TestCapacityScheduler {
RMAppImpl app2 = mock(RMAppImpl.class);
when(app2.getApplicationId()).thenReturn(appId2);
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
when(attempt2.getMasterContainer()).thenReturn(container);
when(attempt2.getSubmissionContext()).thenReturn(submissionContext);
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);

View File

@ -220,7 +220,7 @@ public class FairSchedulerTestBase {
ApplicationId appId = attId.getApplicationId();
RMApp rmApp = new RMAppImpl(appId, rmContext, conf,
null, user, null, ApplicationSubmissionContext.newInstance(appId, null,
queue, null, null, false, false, 0, amResource, null), null, null,
queue, null, null, false, false, 0, amResource, null), scheduler, null,
0, null, null, null);
rmContext.getRMApps().put(appId, rmApp);
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);