MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode. Contributed by Siddharth Seth.

svn merge -c 1214429 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1214432 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-12-14 19:58:00 +00:00
parent 3ae9ee1005
commit 4ba1442b67
21 changed files with 606 additions and 62 deletions

View File

@ -242,6 +242,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3557. MR1 test fail to compile because of missing hadoop-archives
dependency. (tucu)
MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode.
(Siddharth Seth via vinodkv)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -91,12 +91,7 @@ public class YarnConfiguration extends Configuration {
public static final String RM_CLIENT_THREAD_COUNT =
RM_PREFIX + "client.thread-count";
public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 10;
/** The expiry interval for application master reporting.*/
public static final String RM_AM_EXPIRY_INTERVAL_MS =
RM_PREFIX + "am.liveness-monitor.expiry-interval-ms";
public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000;
/** The Kerberos principal for the resource manager.*/
public static final String RM_PRINCIPAL =
RM_PREFIX + "principal";
@ -126,7 +121,17 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8025;
public static final String DEFAULT_RM_RESOURCE_TRACKER_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_RESOURCE_TRACKER_PORT;
/** The expiry interval for application master reporting.*/
public static final String RM_AM_EXPIRY_INTERVAL_MS =
YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms";
public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000;
/** How long to wait until a node manager is considered dead.*/
public static final String RM_NM_EXPIRY_INTERVAL_MS =
YARN_PREFIX + "nm.liveness-monitor.expiry-interval-ms";
public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000;
/** Are acls enabled.*/
public static final String YARN_ACL_ENABLE =
YARN_PREFIX + "acl.enable";
@ -160,12 +165,7 @@ public class YarnConfiguration extends Configuration {
/** The keytab for the resource manager.*/
public static final String RM_KEYTAB =
RM_PREFIX + "keytab";
/** How long to wait until a node manager is considered dead.*/
public static final String RM_NM_EXPIRY_INTERVAL_MS =
RM_PREFIX + "nm.liveness-monitor.expiry-interval-ms";
public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000;
/** How long to wait until a container is considered dead.*/
public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS =
RM_PREFIX + "rm.container-allocation.expiry-interval-ms";
@ -293,10 +293,16 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
/** Interval at which the delayed token removal thread runs */
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
30000l;
/** Whether to enable log aggregation */
public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
public static final boolean DEFAULT_NM_LOG_AGGREGATION_ENABLED = false;
public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
/**
* Number of seconds to retain logs on the NodeManager. Only applicable if Log

View File

@ -53,8 +53,8 @@ public class AggregatedLogsBlock extends HtmlBlock {
logEntity = containerId.toString();
}
if (!conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
html.h1()
._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
._();

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.records;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -33,6 +34,9 @@ public interface NodeStatus {
public abstract void setContainersStatuses(
List<ContainerStatus> containersStatuses);
public abstract List<ApplicationId> getKeepAliveApplications();
public abstract void setKeepAliveApplications(List<ApplicationId> appIds);
NodeHealthStatus getNodeHealthStatus();
void setNodeHealthStatus(NodeHealthStatus healthStatus);

View File

@ -23,13 +23,16 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeHealthStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@ -37,7 +40,9 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements NodeStatus {
public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
NodeStatus {
NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
NodeStatusProto.Builder builder = null;
boolean viaProto = false;
@ -45,6 +50,7 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
private NodeId nodeId = null;
private List<ContainerStatus> containers = null;
private NodeHealthStatus nodeHealthStatus = null;
private List<ApplicationId> keepAliveApplications = null;
public NodeStatusPBImpl() {
builder = NodeStatusProto.newBuilder();
@ -55,15 +61,14 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
viaProto = true;
}
public NodeStatusProto getProto() {
mergeLocalToProto();
public synchronized NodeStatusProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
private synchronized void mergeLocalToBuilder() {
if (this.nodeId != null) {
builder.setNodeId(convertToProtoFormat(this.nodeId));
}
@ -73,9 +78,12 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
if (this.nodeHealthStatus != null) {
builder.setNodeHealthStatus(convertToProtoFormat(this.nodeHealthStatus));
}
if (this.keepAliveApplications != null) {
addKeepAliveApplicationsToProto();
}
}
private void mergeLocalToProto() {
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
@ -84,14 +92,14 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
viaProto = true;
}
private void maybeInitBuilder() {
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = NodeStatusProto.newBuilder(proto);
}
viaProto = false;
}
private void addContainersToProto() {
private synchronized void addContainersToProto() {
maybeInitBuilder();
builder.clearContainersStatuses();
if (containers == null)
@ -124,19 +132,53 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
};
builder.addAllContainersStatuses(iterable);
}
private synchronized void addKeepAliveApplicationsToProto() {
maybeInitBuilder();
builder.clearKeepAliveApplications();
if (keepAliveApplications == null)
return;
Iterable<ApplicationIdProto> iterable = new Iterable<ApplicationIdProto>() {
@Override
public Iterator<ApplicationIdProto> iterator() {
return new Iterator<ApplicationIdProto>() {
Iterator<ApplicationId> iter = keepAliveApplications.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ApplicationIdProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllKeepAliveApplications(iterable);
}
@Override
public int getResponseId() {
public synchronized int getResponseId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
return p.getResponseId();
}
@Override
public void setResponseId(int responseId) {
public synchronized void setResponseId(int responseId) {
maybeInitBuilder();
builder.setResponseId(responseId);
}
@Override
public NodeId getNodeId() {
public synchronized NodeId getNodeId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
if (this.nodeId != null) {
return this.nodeId;
@ -148,8 +190,9 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
return this.nodeId;
}
@Override
public void setNodeId(NodeId nodeId) {
public synchronized void setNodeId(NodeId nodeId) {
maybeInitBuilder();
if (nodeId == null)
builder.clearNodeId();
@ -158,20 +201,35 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
}
@Override
public List<ContainerStatus> getContainersStatuses() {
public synchronized List<ContainerStatus> getContainersStatuses() {
initContainers();
return this.containers;
}
@Override
public void setContainersStatuses(List<ContainerStatus> containers) {
public synchronized void setContainersStatuses(
List<ContainerStatus> containers) {
if (containers == null) {
builder.clearContainersStatuses();
}
this.containers = containers;
}
@Override
public synchronized List<ApplicationId> getKeepAliveApplications() {
initKeepAliveApplications();
return this.keepAliveApplications;
}
@Override
public synchronized void setKeepAliveApplications(List<ApplicationId> appIds) {
if (appIds == null) {
builder.clearKeepAliveApplications();
}
this.keepAliveApplications = appIds;
}
private void initContainers() {
private synchronized void initContainers() {
if (this.containers != null) {
return;
}
@ -185,8 +243,22 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
}
private synchronized void initKeepAliveApplications() {
if (this.keepAliveApplications != null) {
return;
}
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
List<ApplicationIdProto> list = p.getKeepAliveApplicationsList();
this.keepAliveApplications = new ArrayList<ApplicationId>();
for (ApplicationIdProto c : list) {
this.keepAliveApplications.add(convertFromProtoFormat(c));
}
}
@Override
public NodeHealthStatus getNodeHealthStatus() {
public synchronized NodeHealthStatus getNodeHealthStatus() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
if (nodeHealthStatus != null) {
return nodeHealthStatus;
@ -199,7 +271,7 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
}
@Override
public void setNodeHealthStatus(NodeHealthStatus healthStatus) {
public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) {
maybeInitBuilder();
if (healthStatus == null) {
builder.clearNodeHealthStatus();
@ -231,4 +303,12 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
return ((ContainerStatusPBImpl)c).getProto();
}
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) {
return new ApplicationIdPBImpl(c);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId c) {
return ((ApplicationIdPBImpl)c).getProto();
}
}

View File

@ -34,6 +34,7 @@ message NodeStatusProto {
optional int32 response_id = 2;
repeated ContainerStatusProto containersStatuses = 3;
optional NodeHealthStatusProto nodeHealthStatus = 4;
repeated ApplicationIdProto keep_alive_applications = 5;
}
message RegistrationResponseProto {

View File

@ -72,7 +72,7 @@
<property>
<description>The expiry interval for application master reporting.</description>
<name>yarn.resourcemanager.am.liveness-monitor.expiry-interval-ms</name>
<name>yarn.am.liveness-monitor.expiry-interval-ms</name>
<value>600000</value>
</property>
@ -155,7 +155,7 @@
<property>
<description>How long to wait until a node manager is considered dead.</description>
<name>yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms</name>
<name>yarn.nm.liveness-monitor.expiry-interval-ms</name>
<value>600000</value>
</property>
@ -210,6 +210,12 @@
<value>10000</value>
</property>
<property>
<description>Interval at which the delayed token removal thread runs</description>
<name>yarn.resourcemanager.delayed.delegation-token.removal-interval-ms</name>
<value>30000</value>
</property>
<!-- Node Manager Configs -->
<property>
<description>address of node manager IPC.</description>
@ -304,7 +310,7 @@
<property>
<description>Whether to enable log aggregation</description>
<name>yarn.nodemanager.log-aggregation-enable</name>
<name>yarn.log-aggregation-enable</name>
<value>false</value>
</property>

View File

@ -20,8 +20,12 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Map.Entry;
import org.apache.avro.AvroRuntimeException;
@ -56,6 +60,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater {
@ -76,6 +81,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private byte[] secretKeyBytes = new byte[0];
private boolean isStopped;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean tokenKeepAliveEnabled;
private long tokenRemovalDelayMs;
/** Keeps track of when the next keep alive request should be sent for an app*/
private Map<ApplicationId, Long> appTokenKeepAliveMap =
new HashMap<ApplicationId, Long>();
private Random keepAliveDelayRandom = new Random();
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
@ -103,6 +114,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.totalResource = recordFactory.newRecordInstance(Resource.class);
this.totalResource.setMemory(memoryMb);
metrics.addResource(totalResource);
this.tokenKeepAliveEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
&& isSecurityEnabled();
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
super.init(conf);
}
@ -139,6 +157,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
super.stop();
}
protected boolean isSecurityEnabled() {
return UserGroupInformation.isSecurityEnabled();
}
protected ResourceTracker getRMClient() {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
@ -188,6 +210,29 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return this.secretKeyBytes.clone();
}
private List<ApplicationId> createKeepAliveApplicationList() {
if (!tokenKeepAliveEnabled) {
return Collections.emptyList();
}
List<ApplicationId> appList = new ArrayList<ApplicationId>();
for (Iterator<Entry<ApplicationId, Long>> i =
this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) {
Entry<ApplicationId, Long> e = i.next();
ApplicationId appId = e.getKey();
Long nextKeepAlive = e.getValue();
if (!this.context.getApplications().containsKey(appId)) {
// Remove if the application has finished.
i.remove();
} else if (System.currentTimeMillis() > nextKeepAlive) {
// KeepAlive list for the next hearbeat.
appList.add(appId);
trackAppForKeepAlive(appId);
}
}
return appList;
}
private NodeStatus getNodeStatus() {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
@ -231,9 +276,29 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
nodeStatus.setKeepAliveApplications(keepAliveAppIds);
return nodeStatus;
}
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
for (ApplicationId appId : appIds) {
trackAppForKeepAlive(appId);
}
}
}
private void trackAppForKeepAlive(ApplicationId appId) {
// Next keepAlive request for app between 0.7 & 0.9 of when the token will
// likely expire.
long nextTime = System.currentTimeMillis()
+ (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs
* keepAliveDelayRandom.nextInt(100))/100);
appTokenKeepAliveMap.put(appId, nextTime);
}
@Override
public void sendOutofBandHeartBeat() {
synchronized (this.heartbeatMonitor) {
@ -245,6 +310,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
new Thread("Node Status Updater") {
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartBeatID = 0;
while (!isStopped) {
@ -284,6 +350,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanupList();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (appsToCleanup.size() != 0) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));

View File

@ -192,8 +192,8 @@ public class ContainerManagerImpl extends CompositeService implements
protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
return new LogAggregationService(this.dispatcher, context,
deletionService, dirsHandler);
} else {

View File

@ -170,6 +170,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.writer.closeWriter();
LOG.info("Finished aggregate log-file for app " + this.applicationId);
}
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override

View File

@ -88,8 +88,8 @@ public class NMController extends Controller implements YarnWebParams {
containerId.getApplicationAttemptId().getApplicationId();
Application app = nmContext.getApplications().get(appId);
if (app == null
&& nmConf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
&& nmConf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
String redirectUrl = null;
if (logServerUrl == null || logServerUrl.isEmpty()) {

View File

@ -22,7 +22,9 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@ -63,10 +66,12 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.mock;
public class TestNodeStatusUpdater {
@ -216,7 +221,7 @@ public class TestNodeStatusUpdater {
HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
@ -241,6 +246,48 @@ public class TestNodeStatusUpdater {
return resourceTracker;
}
}
private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker;
private Context context;
public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
ContainerTokenSecretManager containerTokenSecretManager) {
super(context, dispatcher, healthChecker, metrics,
containerTokenSecretManager);
this.context = context;
this.resourceTracker = new MyResourceTracker3(this.context);
}
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override
protected boolean isSecurityEnabled() {
return true;
}
}
private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
this.nodeStatusUpdater =
new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics,
containerTokenSecretManager);
return this.nodeStatusUpdater;
}
protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
return this.nodeStatusUpdater;
}
}
//
private class MyResourceTracker2 implements ResourceTracker {
@ -276,6 +323,65 @@ public class TestNodeStatusUpdater {
}
}
private class MyResourceTracker3 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL;
private Map<ApplicationId, List<Long>> keepAliveRequests =
new HashMap<ApplicationId, List<Long>>();
private ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
private final Context context;
MyResourceTracker3(Context context) {
this.context = context;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
RegisterNodeManagerResponse response =
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
RegistrationResponse regResponse =
recordFactory.newRecordInstance(RegistrationResponse.class);
regResponse.setNodeAction(registerNodeAction);
response.setRegistrationResponse(regResponse);
return response;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
LOG.info("Got heartBeatId: [" + heartBeatID +"]");
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++);
HeartbeatResponse response =
recordFactory.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
response.setNodeAction(heartBeatNodeAction);
if (nodeStatus.getKeepAliveApplications() != null
&& nodeStatus.getKeepAliveApplications().size() > 0) {
for (ApplicationId appId : nodeStatus.getKeepAliveApplications()) {
List<Long> list = keepAliveRequests.get(appId);
if (list == null) {
list = new LinkedList<Long>();
keepAliveRequests.put(appId, list);
}
list.add(System.currentTimeMillis());
}
}
if (heartBeatID == 2) {
LOG.info("Sending FINISH_APP for application: [" + appId + "]");
this.context.getApplications().put(appId, mock(Application.class));
response.addAllApplicationsToCleanup(Collections.singletonList(appId));
}
NodeHeartbeatResponse nhResponse =
recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
return nhResponse;
}
}
@Before
public void clearError() {
nmStartError = null;
@ -456,6 +562,38 @@ public class TestNodeStatusUpdater {
verifyNodeStartFailure("Starting of RPC Server failed");
}
@Test
public void testApplicationKeepAlive() throws Exception {
MyNodeManager nm = new MyNodeManager();
try {
YarnConfiguration conf = createNMConfig();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
4000l);
nm.init(conf);
nm.start();
// HB 2 -> app cancelled by RM.
while (heartBeatID < 12) {
Thread.sleep(1000l);
}
MyResourceTracker3 rt =
(MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient();
rt.context.getApplications().remove(rt.appId);
Assert.assertEquals(1, rt.keepAliveRequests.size());
int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size();
LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]");
Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3);
while (heartBeatID < 20) {
Thread.sleep(1000l);
}
int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size();
Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2);
} finally {
if (nm.getServiceState() == STATE.STARTED)
nm.stop();
}
}
private void verifyNodeStartFailure(String errMessage) {
YarnConfiguration conf = createNMConfig();
nm.init(conf);

View File

@ -68,7 +68,7 @@ public class TestNonAggregatingLogHandler {
+ localLogDirs[1].getAbsolutePath();
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
DrainDispatcher dispatcher = createDispatcher(conf);
@ -142,7 +142,7 @@ public class TestNonAggregatingLogHandler {
+ localLogDirs[1].getAbsolutePath();
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l);

View File

@ -173,7 +173,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
} else {
// Inform the DelegationTokenRenewer
if (UserGroupInformation.isSecurityEnabled()) {
rmContext.getDelegationTokenRenewer().removeApplication(applicationId);
rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
}
completedApps.add(applicationId);

View File

@ -272,7 +272,8 @@ public class ResourceTrackerService extends AbstractService implements
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(), latestResponse));
remoteNodeStatus.getContainersStatuses(),
remoteNodeStatus.getKeepAliveApplications(), latestResponse));
nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
return nodeHeartBeatResponse;

View File

@ -414,7 +414,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers,
completedContainers));
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
statusEvent.getKeepAliveAppIds());
return RMNodeState.RUNNING;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -28,15 +29,17 @@ import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
public class RMNodeStatusEvent extends RMNodeEvent {
private final NodeHealthStatus nodeHealthStatus;
private List<ContainerStatus> containersCollection;
private final List<ContainerStatus> containersCollection;
private final HeartbeatResponse latestResponse;
private final List<ApplicationId> keepAliveAppIds;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
HeartbeatResponse latestResponse) {
super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus;
this.containersCollection = collection;
this.keepAliveAppIds = keepAliveAppIds;
this.latestResponse = latestResponse;
}
@ -51,4 +54,8 @@ public class RMNodeStatusEvent extends RMNodeEvent {
public HeartbeatResponse getLatestResponse() {
return this.latestResponse;
}
}
public List<ApplicationId> getKeepAliveAppIds() {
return this.keepAliveAppIds;
}
}

View File

@ -20,14 +20,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
@ -40,6 +45,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService;
/**
@ -65,7 +71,16 @@ public class DelegationTokenRenewer extends AbstractService {
// appId=>List<tokens>
private Set<DelegationTokenToRenew> delegationTokens =
Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
new ConcurrentHashMap<ApplicationId, Long>();
private long tokenRemovalDelayMs;
private Thread delayedRemovalThread;
private boolean tokenKeepAliveEnabled;
public DelegationTokenRenewer() {
super(DelegationTokenRenewer.class.getName());
}
@ -73,6 +88,12 @@ public class DelegationTokenRenewer extends AbstractService {
@Override
public synchronized void init(Configuration conf) {
super.init(conf);
this.tokenKeepAliveEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
}
@Override
@ -81,6 +102,12 @@ public class DelegationTokenRenewer extends AbstractService {
dtCancelThread.start();
renewalTimer = new Timer(true);
if (tokenKeepAliveEnabled) {
delayedRemovalThread =
new Thread(new DelayedTokenRemovalRunnable(getConfig()),
"DelayedTokenCanceller");
delayedRemovalThread.start();
}
}
@Override
@ -94,6 +121,14 @@ public class DelegationTokenRenewer extends AbstractService {
} catch (InterruptedException e) {
e.printStackTrace();
}
if (tokenKeepAliveEnabled && delayedRemovalThread != null) {
delayedRemovalThread.interrupt();
try {
delayedRemovalThread.join(1000);
} catch (InterruptedException e) {
LOG.info("Interrupted while joining on delayed removal thread.", e);
}
}
super.stop();
}
@ -343,12 +378,38 @@ public class DelegationTokenRenewer extends AbstractService {
if(t.timerTask!=null)
t.timerTask.cancel();
}
/**
* Removing delegation token for completed applications.
* @param applicationId completed application
*/
public void removeApplication(ApplicationId applicationId) {
public void applicationFinished(ApplicationId applicationId) {
if (!tokenKeepAliveEnabled) {
removeApplicationFromRenewal(applicationId);
} else {
delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
/**
* Add a list of applications to the keep alive list. If an appId already
* exists, update it's keep-alive time.
*
* @param appIds
* the list of applicationIds to be kept alive.
*
*/
public void updateKeepAliveApplications(List<ApplicationId> appIds) {
if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
for (ApplicationId appId : appIds) {
delayedRemovalMap.put(appId, System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
}
private void removeApplicationFromRenewal(ApplicationId applicationId) {
synchronized (delegationTokens) {
Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
while(it.hasNext()) {
@ -371,4 +432,50 @@ public class DelegationTokenRenewer extends AbstractService {
}
}
}
/**
* Takes care of cancelling app delegation tokens after the configured
* cancellation delay, taking into consideration keep-alive requests.
*
*/
private class DelayedTokenRemovalRunnable implements Runnable {
private long waitTimeMs;
DelayedTokenRemovalRunnable(Configuration conf) {
waitTimeMs =
conf.getLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS);
}
@Override
public void run() {
List<ApplicationId> toCancel = new ArrayList<ApplicationId>();
while (!Thread.currentThread().isInterrupted()) {
Iterator<Entry<ApplicationId, Long>> it =
delayedRemovalMap.entrySet().iterator();
toCancel.clear();
while (it.hasNext()) {
Entry<ApplicationId, Long> e = it.next();
if (e.getValue() < System.currentTimeMillis()) {
toCancel.add(e.getKey());
}
}
for (ApplicationId appId : toCancel) {
removeApplicationFromRenewal(appId);
delayedRemovalMap.remove(appId);
}
synchronized (this) {
try {
wait(waitTimeMs);
} catch (InterruptedException e) {
LOG.info("Delayed Deletion Thread Interrupted. Shutting it down");
return;
}
}
}
}
}
}

View File

@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.security;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -44,6 +44,7 @@ import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Before;
@ -328,7 +329,7 @@ public class TestDelegationTokenRenewer {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplication(applicationId_1, ts, true);
delegationTokenRenewer.removeApplication(applicationId_1);
delegationTokenRenewer.applicationFinished(applicationId_1);
numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
@ -343,7 +344,7 @@ public class TestDelegationTokenRenewer {
// also renewing of the cancelled token should fail
try {
token4.renew(conf);
assertTrue("Renewal of canceled token didn't fail", false);
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {
//expected
}
@ -377,7 +378,7 @@ public class TestDelegationTokenRenewer {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplication(applicationId_1, ts, false);
delegationTokenRenewer.removeApplication(applicationId_1);
delegationTokenRenewer.applicationFinished(applicationId_1);
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
@ -393,4 +394,123 @@ public class TestDelegationTokenRenewer {
// been canceled
token1.renew(conf);
}
/**
* Basic idea of the test:
* 0. Setup token KEEP_ALIVE
* 1. create tokens.
* 2. register them for renewal - to be cancelled on app complete
* 3. Complete app.
* 4. Verify token is alive within the KEEP_ALIVE time
* 5. Verify token has been cancelled after the KEEP_ALIVE_TIME
* @throws IOException
* @throws URISyntaxException
*/
@Test
public void testDTKeepAlive1 () throws Exception {
DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
//Keep tokens alive for 6 seconds.
lconf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 6000l);
//Try removing tokens every second.
lconf.setLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
localDtr.init(lconf);
localDtr.start();
MyFS dfs = (MyFS)FileSystem.get(lconf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
Credentials ts = new Credentials();
// get the delegation tokens
MyToken token1 = dfs.getDelegationToken(new Text("user1"));
String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
ts.addToken(new Text(nn1), token1);
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
localDtr.addApplication(applicationId_0, ts, true);
localDtr.applicationFinished(applicationId_0);
Thread.sleep(3000l);
//Token should still be around. Renewal should not fail.
token1.renew(lconf);
//Allow the keepalive time to run out
Thread.sleep(6000l);
//The token should have been cancelled at this point. Renewal will fail.
try {
token1.renew(lconf);
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {}
}
/**
* Basic idea of the test:
* 0. Setup token KEEP_ALIVE
* 1. create tokens.
* 2. register them for renewal - to be cancelled on app complete
* 3. Complete app.
* 4. Verify token is alive within the KEEP_ALIVE time
* 5. Send an explicity KEEP_ALIVE_REQUEST
* 6. Verify token KEEP_ALIVE time is renewed.
* 7. Verify token has been cancelled after the renewed KEEP_ALIVE_TIME.
* @throws IOException
* @throws URISyntaxException
*/
@Test
public void testDTKeepAlive2() throws Exception {
DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
Configuration lconf = new Configuration(conf);
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
//Keep tokens alive for 6 seconds.
lconf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 6000l);
//Try removing tokens every second.
lconf.setLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
localDtr.init(lconf);
localDtr.start();
MyFS dfs = (MyFS)FileSystem.get(lconf);
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
Credentials ts = new Credentials();
// get the delegation tokens
MyToken token1 = dfs.getDelegationToken(new Text("user1"));
String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
ts.addToken(new Text(nn1), token1);
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
localDtr.addApplication(applicationId_0, ts, true);
localDtr.applicationFinished(applicationId_0);
Thread.sleep(4000l);
//Send another keep alive.
localDtr.updateKeepAliveApplications(Collections
.singletonList(applicationId_0));
//Renewal should not fail.
token1.renew(lconf);
//Token should be around after this.
Thread.sleep(4500l);
//Renewal should not fail. - ~1.5 seconds for keepalive timeout.
token1.renew(lconf);
//Allow the keepalive time to run out
Thread.sleep(3000l);
//The token should have been cancelled at this point. Renewal will fail.
try {
token1.renew(lconf);
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {}
}
}

View File

@ -395,7 +395,7 @@ public class TestRMWebServices extends JerseyTest {
nodeHealth.setHealthReport("test health report");
nodeHealth.setIsNodeHealthy(false);
node.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth,
new ArrayList<ContainerStatus>(), null));
new ArrayList<ContainerStatus>(), null, null));
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.UNHEALTHY);
JSONObject json = r.path("ws").path("v1").path("cluster").path("nodes")

View File

@ -193,6 +193,10 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | | ACLs are of for <comma-separated-users><space><comma-separated-groups>. |
| | | Defaults to special value of <<*>> which means <anyone>. |
| | | Special value of just <space> means no one has access. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.log-aggregation-enable>>> | | |
| | <false> | |
| | | Configuration to enable or disable log aggregation |
*-------------------------+-------------------------+------------------------+
@ -260,10 +264,6 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | are written. | |
| | | Multiple paths help spread disk i/o. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.log-aggregation-enable>>> | | |
| | <false> | |
| | | Configuration to enable or disable log aggregation |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.log.retain-seconds>>> | | |
| | <10800> | |
| | | Default time (in seconds) to retain log files on the NodeManager |