YARN-7952. RM should be able to recover log aggregation status after restart/fail-over. (Xuan Gong via wangda)
Change-Id: I725c9afe64831eda0aa6b0bebdbc79d2dd165707
This commit is contained in:
parent
f47659fb97
commit
4bf622043f
|
@ -1328,7 +1328,10 @@ public class YarnConfiguration extends Configuration {
|
|||
* How long for ResourceManager to wait for NodeManager to report its
|
||||
* log aggregation status. If waiting time of which the log aggregation status
|
||||
* is reported from NodeManager exceeds the configured value, RM will report
|
||||
* log aggregation status for this NodeManager as TIME_OUT
|
||||
* log aggregation status for this NodeManager as TIME_OUT.
|
||||
*
|
||||
* This configuration will be used in NodeManager as well to decide
|
||||
* whether and when to delete the cached log aggregation status.
|
||||
*/
|
||||
public static final String LOG_AGGREGATION_STATUS_TIME_OUT_MS =
|
||||
YARN_PREFIX + "log-aggregation-status.time-out.ms";
|
||||
|
|
|
@ -1286,7 +1286,9 @@
|
|||
How long for ResourceManager to wait for NodeManager to report its
|
||||
log aggregation status. If waiting time of which the log aggregation
|
||||
status is reported from NodeManager exceeds the configured value, RM
|
||||
will report log aggregation status for this NodeManager as TIME_OUT
|
||||
will report log aggregation status for this NodeManager as TIME_OUT.
|
||||
This configuration will be used in NodeManager as well to decide
|
||||
whether and when to delete the cached log aggregation status.
|
||||
</description>
|
||||
<name>yarn.log-aggregation-status.time-out.ms</name>
|
||||
<value>600000</value>
|
||||
|
|
|
@ -112,4 +112,9 @@ public abstract class RegisterNodeManagerRequest {
|
|||
* @param physicalResource Physical resources in the node.
|
||||
*/
|
||||
public abstract void setPhysicalResource(Resource physicalResource);
|
||||
|
||||
public abstract List<LogAggregationReport> getLogAggregationReportsForApps();
|
||||
|
||||
public abstract void setLogAggregationReportsForApps(
|
||||
List<LogAggregationReport> logAggregationReportsForApps);
|
||||
}
|
||||
|
|
|
@ -38,11 +38,13 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
|
||||
|
@ -57,6 +59,8 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
|||
private List<ApplicationId> runningApplications = null;
|
||||
private Set<NodeLabel> labels = null;
|
||||
|
||||
private List<LogAggregationReport> logAggregationReportsForApps = null;
|
||||
|
||||
/** Physical resources in the node. */
|
||||
private Resource physicalResource = null;
|
||||
|
||||
|
@ -100,6 +104,48 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
|||
if (this.physicalResource != null) {
|
||||
builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
|
||||
}
|
||||
if (this.logAggregationReportsForApps != null) {
|
||||
addLogAggregationStatusForAppsToProto();
|
||||
}
|
||||
}
|
||||
|
||||
private void addLogAggregationStatusForAppsToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearLogAggregationReportsForApps();
|
||||
if (this.logAggregationReportsForApps == null) {
|
||||
return;
|
||||
}
|
||||
Iterable<LogAggregationReportProto> it =
|
||||
new Iterable<LogAggregationReportProto>() {
|
||||
@Override
|
||||
public Iterator<LogAggregationReportProto> iterator() {
|
||||
return new Iterator<LogAggregationReportProto>() {
|
||||
private Iterator<LogAggregationReport> iter =
|
||||
logAggregationReportsForApps.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogAggregationReportProto next() {
|
||||
return convertToProtoFormat(iter.next());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
builder.addAllLogAggregationReportsForApps(it);
|
||||
}
|
||||
|
||||
private LogAggregationReportProto convertToProtoFormat(
|
||||
LogAggregationReport value) {
|
||||
return ((LogAggregationReportPBImpl) value).getProto();
|
||||
}
|
||||
|
||||
private synchronized void addNMContainerStatusesToProto() {
|
||||
|
@ -400,4 +446,38 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
|||
NMContainerStatus c) {
|
||||
return ((NMContainerStatusPBImpl)c).getProto();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<LogAggregationReport>
|
||||
getLogAggregationReportsForApps() {
|
||||
if (this.logAggregationReportsForApps != null) {
|
||||
return this.logAggregationReportsForApps;
|
||||
}
|
||||
initLogAggregationReportsForApps();
|
||||
return logAggregationReportsForApps;
|
||||
}
|
||||
|
||||
private void initLogAggregationReportsForApps() {
|
||||
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<LogAggregationReportProto> list =
|
||||
p.getLogAggregationReportsForAppsList();
|
||||
this.logAggregationReportsForApps = new ArrayList<LogAggregationReport>();
|
||||
for (LogAggregationReportProto c : list) {
|
||||
this.logAggregationReportsForApps.add(convertFromProtoFormat(c));
|
||||
}
|
||||
}
|
||||
|
||||
private LogAggregationReport convertFromProtoFormat(
|
||||
LogAggregationReportProto logAggregationReport) {
|
||||
return new LogAggregationReportPBImpl(logAggregationReport);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setLogAggregationReportsForApps(
|
||||
List<LogAggregationReport> logAggregationStatusForApps) {
|
||||
if(logAggregationStatusForApps == null) {
|
||||
builder.clearLogAggregationReportsForApps();
|
||||
}
|
||||
this.logAggregationReportsForApps = logAggregationStatusForApps;
|
||||
}
|
||||
}
|
|
@ -66,6 +66,7 @@ message RegisterNodeManagerRequestProto {
|
|||
repeated ApplicationIdProto runningApplications = 7;
|
||||
optional NodeLabelsProto nodeLabels = 8;
|
||||
optional ResourceProto physicalResource = 9;
|
||||
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
|
||||
}
|
||||
|
||||
message RegisterNodeManagerResponseProto {
|
||||
|
|
|
@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
|
||||
|
@ -121,6 +121,8 @@ public interface Context {
|
|||
|
||||
NMTimelinePublisher getNMTimelinePublisher();
|
||||
|
||||
NMLogAggregationStatusTracker getNMLogAggregationStatusTracker();
|
||||
|
||||
ContainerExecutor getContainerExecutor();
|
||||
|
||||
ContainerStateTransitionListener getContainerStateTransitionListener();
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||
|
@ -135,6 +136,7 @@ public class NodeManager extends CompositeService
|
|||
private boolean rmWorkPreservingRestartEnabled;
|
||||
private boolean shouldExitOnShutdownEvent = false;
|
||||
|
||||
private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
|
||||
/**
|
||||
* Default Container State transition listener.
|
||||
*/
|
||||
|
@ -424,6 +426,12 @@ public class NodeManager extends CompositeService
|
|||
addService(containerManager);
|
||||
((NMContext) context).setContainerManager(containerManager);
|
||||
|
||||
this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(
|
||||
context);
|
||||
addService(nmLogAggregationStatusTracker);
|
||||
((NMContext)context).setNMLogAggregationStatusTracker(
|
||||
this.nmLogAggregationStatusTracker);
|
||||
|
||||
WebServer webServer = createWebServer(context, containerManager
|
||||
.getContainersMonitor(), this.aclsManager, dirsHandler);
|
||||
addService(webServer);
|
||||
|
@ -621,6 +629,8 @@ public class NodeManager extends CompositeService
|
|||
|
||||
private ResourcePluginManager resourcePluginManager;
|
||||
|
||||
private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
|
||||
|
||||
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
|
||||
|
@ -862,6 +872,15 @@ public class NodeManager extends CompositeService
|
|||
public void setDeletionService(DeletionService deletionService) {
|
||||
this.deletionService = deletionService;
|
||||
}
|
||||
|
||||
public void setNMLogAggregationStatusTracker(
|
||||
NMLogAggregationStatusTracker nmLogAggregationStatusTracker) {
|
||||
this.nmLogAggregationStatusTracker = nmLogAggregationStatusTracker;
|
||||
}
|
||||
@Override
|
||||
public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
|
||||
return nmLogAggregationStatusTracker;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -965,4 +984,9 @@ public class NodeManager extends CompositeService
|
|||
public NodeStatusUpdater getNodeStatusUpdater() {
|
||||
return nodeStatusUpdater;
|
||||
}
|
||||
|
||||
private NMLogAggregationStatusTracker createNMLogAggregationStatusTracker(
|
||||
Context ctxt) {
|
||||
return new NMLogAggregationStatusTracker(ctxt);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -381,6 +381,20 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
if (containerReports != null) {
|
||||
LOG.info("Registering with RM using containers :" + containerReports);
|
||||
}
|
||||
if (logAggregationEnabled) {
|
||||
// pull log aggregation status for application running in this NM
|
||||
List<LogAggregationReport> logAggregationReports =
|
||||
context.getNMLogAggregationStatusTracker()
|
||||
.pullCachedLogAggregationReports();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The cache log aggregation status size:"
|
||||
+ logAggregationReports.size());
|
||||
}
|
||||
if (logAggregationReports != null
|
||||
&& !logAggregationReports.isEmpty()) {
|
||||
request.setLogAggregationReportsForApps(logAggregationReports);
|
||||
}
|
||||
}
|
||||
regNMResponse =
|
||||
resourceTracker.registerNodeManager(request);
|
||||
// Make sure rmIdentifier is set before we release the lock
|
||||
|
|
|
@ -385,7 +385,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||
logAggregationSucceedInThisCycle
|
||||
? LogAggregationStatus.RUNNING
|
||||
: LogAggregationStatus.RUNNING_WITH_FAILURE;
|
||||
sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage);
|
||||
sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage,
|
||||
false);
|
||||
if (appFinished) {
|
||||
// If the app is finished, one extra final report with log aggregation
|
||||
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
|
||||
|
@ -394,18 +395,22 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
|
||||
? LogAggregationStatus.FAILED
|
||||
: LogAggregationStatus.SUCCEEDED;
|
||||
sendLogAggregationReportInternal(finalLogAggregationStatus, "");
|
||||
sendLogAggregationReportInternal(finalLogAggregationStatus, "", true);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendLogAggregationReportInternal(
|
||||
LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
|
||||
LogAggregationStatus logAggregationStatus, String diagnosticMessage,
|
||||
boolean finalized) {
|
||||
LogAggregationReport report =
|
||||
Records.newRecord(LogAggregationReport.class);
|
||||
report.setApplicationId(appId);
|
||||
report.setDiagnosticMessage(diagnosticMessage);
|
||||
report.setLogAggregationStatus(logAggregationStatus);
|
||||
this.context.getLogAggregationStatusForApps().add(report);
|
||||
this.context.getNMLogAggregationStatusTracker().updateLogAggregationStatus(
|
||||
appId, logAggregationStatus, System.currentTimeMillis(),
|
||||
diagnosticMessage, finalized);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -0,0 +1,270 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* {@link NMLogAggregationStatusTracker} is used to cache log aggregation
|
||||
* status for finished applications. It will also delete the old cached
|
||||
* log aggregation status periodically.
|
||||
*
|
||||
*/
|
||||
public class NMLogAggregationStatusTracker extends CompositeService {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(NMLogAggregationStatusTracker.class);
|
||||
|
||||
private final ReadLock readLocker;
|
||||
private final WriteLock writeLocker;
|
||||
private final Context nmContext;
|
||||
private final long rollingInterval;
|
||||
private final Timer timer;
|
||||
private final Map<ApplicationId, AppLogAggregationStatusForRMRecovery>
|
||||
recoveryStatuses;
|
||||
private boolean disabled = false;
|
||||
|
||||
public NMLogAggregationStatusTracker(Context context) {
|
||||
super(NMLogAggregationStatusTracker.class.getName());
|
||||
this.nmContext = context;
|
||||
Configuration conf = context.getConf();
|
||||
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
|
||||
disabled = true;
|
||||
}
|
||||
this.recoveryStatuses = new ConcurrentHashMap<>();
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
this.readLocker = lock.readLock();
|
||||
this.writeLocker = lock.writeLock();
|
||||
this.timer = new Timer();
|
||||
long configuredRollingInterval = conf.getLong(
|
||||
YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
|
||||
if (configuredRollingInterval <= 0) {
|
||||
this.rollingInterval = YarnConfiguration
|
||||
.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
|
||||
LOG.warn("The configured log-aggregation-status.time-out.ms is "
|
||||
+ configuredRollingInterval + " which should be larger than 0. "
|
||||
+ "Using the default value:" + this.rollingInterval + " instead.");
|
||||
} else {
|
||||
this.rollingInterval = configuredRollingInterval;
|
||||
}
|
||||
LOG.info("the rolling interval seconds for the NodeManager Cached Log "
|
||||
+ "aggregation status is " + (rollingInterval/1000));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
if (disabled) {
|
||||
LOG.warn("Log Aggregation is disabled."
|
||||
+ "So is the LogAggregationStatusTracker.");
|
||||
} else {
|
||||
this.timer.scheduleAtFixedRate(new LogAggregationStatusRoller(),
|
||||
rollingInterval, rollingInterval);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStop() throws Exception {
|
||||
this.timer.cancel();
|
||||
}
|
||||
|
||||
public void updateLogAggregationStatus(ApplicationId appId,
|
||||
LogAggregationStatus logAggregationStatus, long updateTime,
|
||||
String diagnosis, boolean finalized) {
|
||||
if (disabled) {
|
||||
LOG.warn("The log aggregation is diabled. No need to update "
|
||||
+ "the log aggregation status");
|
||||
}
|
||||
// In NM, each application has exactly one appLogAggregator thread
|
||||
// to handle the log aggregation. So, it is fine which multiple
|
||||
// appLogAggregator thread to update log aggregation status for its
|
||||
// own application. This is why we are using readLocker here.
|
||||
this.readLocker.lock();
|
||||
try {
|
||||
AppLogAggregationStatusForRMRecovery tracker = recoveryStatuses
|
||||
.get(appId);
|
||||
if (tracker == null) {
|
||||
Application application = this.nmContext.getApplications().get(appId);
|
||||
if (application == null) {
|
||||
LOG.warn("The application:" + appId + " has already finished,"
|
||||
+ " and has been removed from NodeManager, we should not "
|
||||
+ "receive the log aggregation status update for "
|
||||
+ "this application.");
|
||||
return;
|
||||
}
|
||||
AppLogAggregationStatusForRMRecovery newTracker =
|
||||
new AppLogAggregationStatusForRMRecovery(logAggregationStatus,
|
||||
diagnosis);
|
||||
newTracker.setLastModifiedTime(updateTime);
|
||||
newTracker.setFinalized(finalized);
|
||||
recoveryStatuses.put(appId, newTracker);
|
||||
} else {
|
||||
if (tracker.isFinalized()) {
|
||||
LOG.warn("Ignore the log aggregation status update request "
|
||||
+ "for the application:" + appId + ". The cached log aggregation "
|
||||
+ "status is " + tracker.getLogAggregationStatus() + ".");
|
||||
} else {
|
||||
if (tracker.getLastModifiedTime() > updateTime) {
|
||||
LOG.warn("Ignore the log aggregation status update request "
|
||||
+ "for the application:" + appId + ". The request log "
|
||||
+ "aggregation status update is older than the cached "
|
||||
+ "log aggregation status.");
|
||||
} else {
|
||||
tracker.setLogAggregationStatus(logAggregationStatus);
|
||||
tracker.setDiagnosis(diagnosis);
|
||||
tracker.setLastModifiedTime(updateTime);
|
||||
tracker.setFinalized(finalized);
|
||||
recoveryStatuses.put(appId, tracker);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.readLocker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public List<LogAggregationReport> pullCachedLogAggregationReports() {
|
||||
List<LogAggregationReport> reports = new ArrayList<>();
|
||||
if (disabled) {
|
||||
LOG.warn("The log aggregation is diabled."
|
||||
+ "There is no cached log aggregation status.");
|
||||
return reports;
|
||||
}
|
||||
// When we pull cached Log aggregation reports for all application in
|
||||
// this NM, we should make sure that we need to block all of the
|
||||
// updateLogAggregationStatus calls. So, the writeLocker is used here.
|
||||
this.writeLocker.lock();
|
||||
try {
|
||||
for(Entry<ApplicationId, AppLogAggregationStatusForRMRecovery> tracker :
|
||||
recoveryStatuses.entrySet()) {
|
||||
AppLogAggregationStatusForRMRecovery current = tracker.getValue();
|
||||
LogAggregationReport report = LogAggregationReport.newInstance(
|
||||
tracker.getKey(), current.getLogAggregationStatus(),
|
||||
current.getDiagnosis());
|
||||
reports.add(report);
|
||||
}
|
||||
return reports;
|
||||
} finally {
|
||||
this.writeLocker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private class LogAggregationStatusRoller extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
rollLogAggregationStatus();
|
||||
}
|
||||
}
|
||||
|
||||
private void rollLogAggregationStatus() {
|
||||
// When we call rollLogAggregationStatus, basically fetch all
|
||||
// cached log aggregation status and delete the out-of-timeout period
|
||||
// log aggregation status, we should block the rollLogAggregationStatus
|
||||
// calls as well as pullCachedLogAggregationReports call. So, the
|
||||
// writeLocker is used here.
|
||||
this.writeLocker.lock();
|
||||
try {
|
||||
long currentTimeStamp = System.currentTimeMillis();
|
||||
LOG.info("Rolling over the cached log aggregation status.");
|
||||
Iterator<Entry<ApplicationId, AppLogAggregationStatusForRMRecovery>> it
|
||||
= recoveryStatuses.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
Entry<ApplicationId, AppLogAggregationStatusForRMRecovery> tracker =
|
||||
it.next();
|
||||
// the application has finished.
|
||||
if (nmContext.getApplications().get(tracker.getKey()) == null) {
|
||||
if (currentTimeStamp - tracker.getValue().getLastModifiedTime()
|
||||
> rollingInterval) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.writeLocker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private static class AppLogAggregationStatusForRMRecovery {
|
||||
private LogAggregationStatus logAggregationStatus;
|
||||
private long lastModifiedTime;
|
||||
private boolean finalized;
|
||||
private String diagnosis;
|
||||
|
||||
AppLogAggregationStatusForRMRecovery(
|
||||
LogAggregationStatus logAggregationStatus, String diagnosis) {
|
||||
this.setLogAggregationStatus(logAggregationStatus);
|
||||
this.setDiagnosis(diagnosis);
|
||||
}
|
||||
|
||||
public LogAggregationStatus getLogAggregationStatus() {
|
||||
return logAggregationStatus;
|
||||
}
|
||||
|
||||
public void setLogAggregationStatus(
|
||||
LogAggregationStatus logAggregationStatus) {
|
||||
this.logAggregationStatus = logAggregationStatus;
|
||||
}
|
||||
|
||||
public long getLastModifiedTime() {
|
||||
return lastModifiedTime;
|
||||
}
|
||||
|
||||
public void setLastModifiedTime(long lastModifiedTime) {
|
||||
this.lastModifiedTime = lastModifiedTime;
|
||||
}
|
||||
|
||||
public boolean isFinalized() {
|
||||
return finalized;
|
||||
}
|
||||
|
||||
public void setFinalized(boolean finalized) {
|
||||
this.finalized = finalized;
|
||||
}
|
||||
|
||||
public String getDiagnosis() {
|
||||
return diagnosis;
|
||||
}
|
||||
|
||||
public void setDiagnosis(String diagnosis) {
|
||||
this.diagnosis = diagnosis;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
|
@ -814,5 +815,10 @@ public abstract class BaseAMRMProxyTest {
|
|||
public DeletionService getDeletionService() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,14 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -48,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -77,6 +85,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
|
@ -130,6 +139,16 @@ public abstract class BaseContainerManagerTest {
|
|||
public ContainerExecutor getContainerExecutor() {
|
||||
return exec;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
|
||||
NMLogAggregationStatusTracker mock = mock(
|
||||
NMLogAggregationStatusTracker.class);
|
||||
doNothing().when(mock).updateLogAggregationStatus(
|
||||
any(ApplicationId.class), any(LogAggregationStatus.class),
|
||||
anyLong(), anyString(), anyBoolean());
|
||||
return mock;
|
||||
}
|
||||
};
|
||||
protected ContainerExecutor exec;
|
||||
protected DeletionService delSrvc;
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Function test for {@link NMLogAggregationStatusTracker}.
|
||||
*
|
||||
*/
|
||||
public class TestNMLogAggregationStatusTracker {
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test
|
||||
public void testNMLogAggregationStatusUpdate() {
|
||||
long baseTime = System.currentTimeMillis();
|
||||
Context mockContext = mock(Context.class);
|
||||
ConcurrentMap<ApplicationId, Application> apps = new ConcurrentHashMap<>();
|
||||
ApplicationId appId1 = ApplicationId.newInstance(
|
||||
System.currentTimeMillis(), 1);
|
||||
apps.putIfAbsent(appId1, mock(Application.class));
|
||||
when(mockContext.getApplications()).thenReturn(apps);
|
||||
// the log aggregation is disabled.
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
|
||||
when(mockContext.getConf()).thenReturn(conf);
|
||||
NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker(
|
||||
mockContext);
|
||||
ApplicationId appId0 = ApplicationId.newInstance(0, 0);
|
||||
tracker.updateLogAggregationStatus(appId0,
|
||||
LogAggregationStatus.RUNNING, System.currentTimeMillis(), "", false);
|
||||
List<LogAggregationReport> reports = tracker
|
||||
.pullCachedLogAggregationReports();
|
||||
// we can not get any cached log aggregation status because
|
||||
// the log aggregation is disabled.
|
||||
Assert.assertTrue("No cached log aggregation status because "
|
||||
+ "log aggregation is disabled.", reports.isEmpty());
|
||||
|
||||
// enable the log aggregation.
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
when(mockContext.getConf()).thenReturn(conf);
|
||||
tracker = new NMLogAggregationStatusTracker(mockContext);
|
||||
// update the log aggregation status for an un-existed/finished
|
||||
// application, we should ignore the status update request.
|
||||
appId0 = ApplicationId.newInstance(0, 0);
|
||||
tracker.updateLogAggregationStatus(appId0,
|
||||
LogAggregationStatus.RUNNING, baseTime, "", false);
|
||||
reports = tracker
|
||||
.pullCachedLogAggregationReports();
|
||||
Assert.assertTrue("No cached log aggregation status "
|
||||
+ "because the application is finished or not existed.",
|
||||
reports.isEmpty());
|
||||
|
||||
tracker.updateLogAggregationStatus(appId1,
|
||||
LogAggregationStatus.RUNNING, baseTime, "", false);
|
||||
reports = tracker
|
||||
.pullCachedLogAggregationReports();
|
||||
Assert.assertEquals("Should have one cached log aggregation status.",
|
||||
1, reports.size());
|
||||
Assert.assertEquals("The cached log aggregation status should be RUNNING.",
|
||||
LogAggregationStatus.RUNNING,
|
||||
reports.get(0).getLogAggregationStatus());
|
||||
|
||||
tracker.updateLogAggregationStatus(appId1,
|
||||
LogAggregationStatus.SUCCEEDED, baseTime + 60 * 1000, "", true);
|
||||
reports = tracker
|
||||
.pullCachedLogAggregationReports();
|
||||
Assert.assertEquals(1, reports.size());
|
||||
Assert.assertEquals("Update cached log aggregation status to SUCCEEDED",
|
||||
LogAggregationStatus.SUCCEEDED,
|
||||
reports.get(0).getLogAggregationStatus());
|
||||
|
||||
// the log aggregation status is finalized. So, we would
|
||||
// ingore the following update
|
||||
tracker.updateLogAggregationStatus(appId1,
|
||||
LogAggregationStatus.FAILED, baseTime + 10 * 60 * 1000, "", true);
|
||||
reports = tracker
|
||||
.pullCachedLogAggregationReports();
|
||||
Assert.assertEquals(1, reports.size());
|
||||
Assert.assertEquals("The cached log aggregation status "
|
||||
+ "should be still SUCCEEDED.", LogAggregationStatus.SUCCEEDED,
|
||||
reports.get(0).getLogAggregationStatus());
|
||||
}
|
||||
|
||||
public void testLogAggregationStatusRoller() throws Exception {
|
||||
Context mockContext = mock(Context.class);
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
||||
10 * 1000);
|
||||
when(mockContext.getConf()).thenReturn(conf);
|
||||
ConcurrentMap<ApplicationId, Application> apps = new ConcurrentHashMap<>();
|
||||
ApplicationId appId1 = ApplicationId.newInstance(
|
||||
System.currentTimeMillis(), 1);
|
||||
apps.putIfAbsent(appId1, mock(Application.class));
|
||||
when(mockContext.getApplications()).thenReturn(apps);
|
||||
final NMLogAggregationStatusTracker tracker =
|
||||
new NMLogAggregationStatusTracker(mockContext);
|
||||
tracker.updateLogAggregationStatus(appId1,
|
||||
LogAggregationStatus.RUNNING,
|
||||
System.currentTimeMillis(), "", false);
|
||||
// verify that we have cached the log aggregation status for app1
|
||||
List<LogAggregationReport> reports = tracker
|
||||
.pullCachedLogAggregationReports();
|
||||
Assert.assertEquals("Should have one cached log aggregation status.",
|
||||
1, reports.size());
|
||||
Assert.assertEquals("The cached log aggregation status should be RUNNING.",
|
||||
LogAggregationStatus.RUNNING,
|
||||
reports.get(0).getLogAggregationStatus());
|
||||
// wait for 10s
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
List<LogAggregationReport>reports = tracker
|
||||
.pullCachedLogAggregationReports();
|
||||
return reports.size() == 0;
|
||||
}
|
||||
}, 2000, 10000);
|
||||
}
|
||||
}
|
|
@ -399,9 +399,21 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
|
||||
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
|
||||
if (oldNode == null) {
|
||||
RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
|
||||
request.getNMContainerStatuses(),
|
||||
request.getRunningApplications());
|
||||
if (request.getLogAggregationReportsForApps() != null
|
||||
&& !request.getLogAggregationReportsForApps().isEmpty()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found the number of previous cached log aggregation "
|
||||
+ "status from nodemanager:" + nodeId + " is :"
|
||||
+ request.getLogAggregationReportsForApps().size());
|
||||
}
|
||||
startEvent.setLogAggregationReportsForApps(request
|
||||
.getLogAggregationReportsForApps());
|
||||
}
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
|
||||
request.getRunningApplications()));
|
||||
startEvent);
|
||||
} else {
|
||||
LOG.info("Reconnect from the node at: " + host);
|
||||
this.nmLivelinessMonitor.unregister(nodeId);
|
||||
|
@ -426,7 +438,6 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
this.rmContext.getRMNodes().put(nodeId, rmNode);
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMNodeStartedEvent(nodeId, null, null));
|
||||
|
||||
} else {
|
||||
// Reset heartbeat ID since node just restarted.
|
||||
oldNode.resetLastNodeHeartBeatResponse();
|
||||
|
|
|
@ -866,6 +866,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodesListManagerEvent(
|
||||
NodesListManagerEventType.NODE_USABLE, rmNode));
|
||||
List<LogAggregationReport> logAggregationReportsForApps =
|
||||
startEvent.getLogAggregationReportsForApps();
|
||||
if (logAggregationReportsForApps != null
|
||||
&& !logAggregationReportsForApps.isEmpty()) {
|
||||
rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,12 +22,14 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
|
||||
public class RMNodeStartedEvent extends RMNodeEvent {
|
||||
|
||||
private List<NMContainerStatus> containerStatuses;
|
||||
private List<ApplicationId> runningApplications;
|
||||
private List<LogAggregationReport> logAggregationReportsForApps;
|
||||
|
||||
public RMNodeStartedEvent(NodeId nodeId,
|
||||
List<NMContainerStatus> containerReports,
|
||||
|
@ -44,4 +46,13 @@ public class RMNodeStartedEvent extends RMNodeEvent {
|
|||
public List<ApplicationId> getRunningApplications() {
|
||||
return runningApplications;
|
||||
}
|
||||
|
||||
public List<LogAggregationReport> getLogAggregationReportsForApps() {
|
||||
return this.logAggregationReportsForApps;
|
||||
}
|
||||
|
||||
public void setLogAggregationReportsForApps(
|
||||
List<LogAggregationReport> logAggregationReportsForApps) {
|
||||
this.logAggregationReportsForApps = logAggregationReportsForApps;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue