YARN-953. Changed ResourceManager to start writing history data. Contributed by Zhijie Shen.

svn merge --ignore-ancestry -c 1556738 ../YARN-321


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562192 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-01-28 19:56:28 +00:00
parent 0d2e62ae04
commit 0f87e4928b
41 changed files with 1572 additions and 52 deletions

View File

@ -493,6 +493,9 @@ Branch YARN-321: Generic ApplicationHistoryService
YARN-987. Added ApplicationHistoryManager responsible for exposing reports to
all clients. (Mayank Bansal via vinodkv)
YARN-953. Changed ResourceManager to start writing history data. (Zhijie Shen
via vinodkv)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -263,6 +263,22 @@ public class YarnConfiguration extends Configuration {
RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000;
/** The setting that controls whether RM writes history data. */
public static final String RM_HISTORY_WRITER_ENABLED = RM_PREFIX
+ "history-writer.enabled";
public static final boolean DEFAULT_RM_HISTORY_WRITER_ENABLED = false;
/** Number of worker threads that write the history data. */
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";
public static final int DEFAULT_RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
10;
/** The implementation class of ApplicationHistoryStore, which is to be used
* by RMApplicationHistoryWriter. */
public static final String RM_HISTORY_WRITER_CLASS = RM_PREFIX
+ "history-writer.class";
//Delegation token related keys
public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
RM_PREFIX + "delegation.key.update-interval";

View File

@ -564,6 +564,27 @@
<value>org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy</value>
</property>
<property>
<description>Enable RM to write history data. If true, then
yarn.resourcemanager.history-writer.class must be specified</description>
<name>yarn.resourcemanager.history-writer.enabled</name>
<value>false</value>
</property>
<property>
<description>Number of worker threads that write the history data.</description>
<name>yarn.resourcemanager.history-writer.multi-threaded-dispatcher.pool-size</name>
<value>10</value>
</property>
<property>
<description>The implementation class of ApplicationHistoryStore, which is
to be used by RMApplicationHistoryWriter.
</description>
<name>yarn.resourcemanager.history-writer.class</name>
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.NullApplicationHistoryStore</value>
</property>
<!-- Node Manager Configs -->
<property>
<description>The hostname of the NM.</description>
@ -1047,7 +1068,7 @@
<description>URI pointing to the location of the FileSystem path where
the history will be persisted. This must be supplied when using
org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore
as the value for yarn.resourcemanager.ahs.writer.class</description>
as the value for yarn.resourcemanager.history-writer.store.class</description>
<name>yarn.ahs.fs-history-store.uri</name>
<value>${hadoop.log.dir}/yarn/system/ahstore</value>
</property>

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
/**
* Dummy implementation of {@link ApplicationHistoryStore}. If this
* implementation is used, no history data will be persisted.
*
*/
@Unstable
@Private
public class NullApplicationHistoryStore extends AbstractService implements
ApplicationHistoryStore {
public NullApplicationHistoryStore() {
super(NullApplicationHistoryStore.class.getName());
}
@Override
public void applicationStarted(ApplicationStartData appStart)
throws IOException {
}
@Override
public void applicationFinished(ApplicationFinishData appFinish)
throws IOException {
}
@Override
public void applicationAttemptStarted(
ApplicationAttemptStartData appAttemptStart) throws IOException {
}
@Override
public void applicationAttemptFinished(
ApplicationAttemptFinishData appAttemptFinish) throws IOException {
}
@Override
public void containerStarted(ContainerStartData containerStart)
throws IOException {
}
@Override
public void containerFinished(ContainerFinishData containerFinish)
throws IOException {
}
@Override
public ApplicationHistoryData getApplication(ApplicationId appId)
throws IOException {
return null;
}
@Override
public Map<ApplicationId, ApplicationHistoryData> getAllApplications()
throws IOException {
return Collections.emptyMap();
}
@Override
public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
getApplicationAttempts(ApplicationId appId) throws IOException {
return Collections.emptyMap();
}
@Override
public ApplicationAttemptHistoryData getApplicationAttempt(
ApplicationAttemptId appAttemptId) throws IOException {
return null;
}
@Override
public ContainerHistoryData getContainer(ContainerId containerId)
throws IOException {
return null;
}
@Override
public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
throws IOException {
return null;
}
@Override
public Map<ContainerId, ContainerHistoryData> getContainers(
ApplicationAttemptId appAttemptId) throws IOException {
return Collections.emptyMap();
}
}

View File

@ -148,7 +148,7 @@ public void setQueue(String queue) {
@Override
public long getSubmitTime() {
ApplicationStartDataProtoOrBuilder p = viaProto ? proto : builder;
return p.getStartTime();
return p.getSubmitTime();
}
@Override

View File

@ -183,6 +183,11 @@
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@ -33,8 +34,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
/**
@ -90,4 +91,10 @@ public interface RMContext {
void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager);
RMApplicationHistoryWriter getRMApplicationHistoryWriter();
void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter);
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -76,6 +77,7 @@ public class RMContextImpl implements RMContext {
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
/**
* Default constructor. To be used in conjunction with setter methods for
@ -95,7 +97,8 @@ public RMContextImpl(Dispatcher rmDispatcher,
AMRMTokenSecretManager appTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this();
this.setDispatcher(rmDispatcher);
this.setContainerAllocationExpirer(containerAllocationExpirer);
@ -106,6 +109,7 @@ public RMContextImpl(Dispatcher rmDispatcher,
this.setContainerTokenSecretManager(containerTokenSecretManager);
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
@ -318,4 +322,16 @@ public HAServiceState getHAServiceState() {
return haServiceState;
}
}
@Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return rmApplicationHistoryWriter;
}
@Override
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
}

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
@ -261,6 +262,10 @@ protected RMAppManager createRMAppManager() {
this.applicationACLsManager, this.conf);
}
protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
return new RMApplicationHistoryWriter();
}
// sanity check for configurations
protected static void validateConfigs(Configuration conf) {
// validate max-attempts
@ -345,6 +350,11 @@ protected void serviceInit(Configuration configuration) throws Exception {
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
}
RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);

View File

@ -0,0 +1,334 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.applicationhistoryservice.NullApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* <p>
* {@link ResourceManager} uses this class to write the information of
* {@link RMApp}, {@link RMAppAttempt} and {@link RMContainer}. These APIs are
* non-blocking, and just schedule a writing history event. An self-contained
* dispatcher vector will handle the event in separate threads, and extract the
* required fields that are going to be persisted. Then, the extracted
* information will be persisted via the implementation of
* {@link ApplicationHistoryStore}.
* </p>
*/
@Private
@Unstable
public class RMApplicationHistoryWriter extends CompositeService {
public static final Log LOG =
LogFactory.getLog(RMApplicationHistoryWriter.class);
private Dispatcher dispatcher;
private ApplicationHistoryWriter writer;
public RMApplicationHistoryWriter() {
super(RMApplicationHistoryWriter.class.getName());
}
@Override
protected synchronized void serviceInit(
Configuration conf) throws Exception {
writer = createApplicationHistoryStore(conf);
addIfService(writer);
dispatcher = createDispatcher(conf);
dispatcher.register(
WritingHistoryEventType.class, new ForwardingEventHandler());
addIfService(dispatcher);
super.serviceInit(conf);
}
protected Dispatcher createDispatcher(Configuration conf) {
MultiThreadedDispatcher dispatcher = new MultiThreadedDispatcher(conf.getInt(
YarnConfiguration.RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE,
YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE));
dispatcher.setDrainEventsOnStop();
return dispatcher;
}
protected ApplicationHistoryStore createApplicationHistoryStore(
Configuration conf) {
boolean ahsEnabled = conf.getBoolean(
YarnConfiguration.RM_HISTORY_WRITER_ENABLED,
YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_ENABLED);
// If the history writer is not enabled, a dummy store will be used to
// write nothing
if (ahsEnabled) {
try {
Class<? extends ApplicationHistoryStore> storeClass =
conf.getClass(YarnConfiguration.RM_HISTORY_WRITER_CLASS,
NullApplicationHistoryStore.class,
ApplicationHistoryStore.class);
return storeClass.newInstance();
} catch (Exception e) {
String msg = "Could not instantiate ApplicationHistoryWriter: " +
conf.get(YarnConfiguration.RM_HISTORY_WRITER_CLASS,
NullApplicationHistoryStore.class.getName());
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
} else {
return new NullApplicationHistoryStore();
}
}
protected void handleWritingApplicationHistoryEvent(
WritingApplicationHistoryEvent event) {
switch (event.getType()) {
case APP_START:
WritingApplicationStartEvent wasEvent =
(WritingApplicationStartEvent) event;
try {
writer.applicationStarted(wasEvent.getApplicationStartData());
LOG.info("Stored the start data of application "
+ wasEvent.getApplicationId());
} catch (IOException e) {
LOG.error("Error when storing the start data of application "
+ wasEvent.getApplicationId());
}
break;
case APP_FINISH:
WritingApplicationFinishEvent wafEvent =
(WritingApplicationFinishEvent) event;
try {
writer.applicationFinished(wafEvent.getApplicationFinishData());
LOG.info("Stored the finish data of application "
+ wafEvent.getApplicationId());
} catch (IOException e) {
LOG.error("Error when storing the finish data of application "
+ wafEvent.getApplicationId());
}
break;
case APP_ATTEMPT_START:
WritingApplicationAttemptStartEvent waasEvent =
(WritingApplicationAttemptStartEvent) event;
try {
writer.applicationAttemptStarted(waasEvent
.getApplicationAttemptStartData());
LOG.info("Stored the start data of application attempt "
+ waasEvent.getApplicationAttemptId());
} catch (IOException e) {
LOG.error("Error when storing the start data of application attempt "
+ waasEvent.getApplicationAttemptId());
}
break;
case APP_ATTEMPT_FINISH:
WritingApplicationAttemptFinishEvent waafEvent =
(WritingApplicationAttemptFinishEvent) event;
try {
writer.applicationAttemptFinished(waafEvent
.getApplicationAttemptFinishData());
LOG.info("Stored the finish data of application attempt "
+ waafEvent.getApplicationAttemptId());
} catch (IOException e) {
LOG.error("Error when storing the finish data of application attempt "
+ waafEvent.getApplicationAttemptId());
}
break;
case CONTAINER_START:
WritingContainerStartEvent wcsEvent =
(WritingContainerStartEvent) event;
try {
writer.containerStarted(wcsEvent.getContainerStartData());
LOG.info("Stored the start data of container "
+ wcsEvent.getContainerId());
} catch (IOException e) {
LOG.error("Error when storing the start data of container "
+ wcsEvent.getContainerId());
}
break;
case CONTAINER_FINISH:
WritingContainerFinishEvent wcfEvent =
(WritingContainerFinishEvent) event;
try {
writer.containerFinished(wcfEvent.getContainerFinishData());
LOG.info("Stored the finish data of container "
+ wcfEvent.getContainerId());
} catch (IOException e) {
LOG.error("Error when storing the finish data of container "
+ wcfEvent.getContainerId());
}
break;
default:
LOG.error("Unknown WritingApplicationHistoryEvent type: "
+ event.getType());
}
}
@SuppressWarnings("unchecked")
public void applicationStarted(RMApp app) {
dispatcher.getEventHandler().handle(
new WritingApplicationStartEvent(app.getApplicationId(),
ApplicationStartData.newInstance(app.getApplicationId(),
app.getName(), app.getApplicationType(), app.getQueue(),
app.getUser(), app.getSubmitTime(), app.getStartTime())));
}
@SuppressWarnings("unchecked")
public void applicationFinished(RMApp app) {
dispatcher.getEventHandler().handle(
new WritingApplicationFinishEvent(app.getApplicationId(),
ApplicationFinishData.newInstance(app.getApplicationId(),
app.getFinishTime(),
app.getDiagnostics().toString(),
app.getFinalApplicationStatus(),
app.createApplicationState())));
}
@SuppressWarnings("unchecked")
public void applicationAttemptStarted(RMAppAttempt appAttempt) {
dispatcher.getEventHandler().handle(
new WritingApplicationAttemptStartEvent(appAttempt.getAppAttemptId(),
ApplicationAttemptStartData.newInstance(
appAttempt.getAppAttemptId(), appAttempt.getHost(),
appAttempt.getRpcPort(), appAttempt.getMasterContainer()
.getId())));
}
@SuppressWarnings("unchecked")
public void applicationAttemptFinished(RMAppAttempt appAttempt) {
dispatcher.getEventHandler().handle(
new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(),
ApplicationAttemptFinishData.newInstance(appAttempt
.getAppAttemptId(), appAttempt.getDiagnostics().toString(),
appAttempt.getTrackingUrl(), appAttempt
.getFinalApplicationStatus(), appAttempt
.createApplicationAttemptState())));
}
@SuppressWarnings("unchecked")
public void containerStarted(RMContainer container) {
dispatcher.getEventHandler().handle(
new WritingContainerStartEvent(container.getContainerId(),
ContainerStartData.newInstance(container.getContainerId(),
container.getAllocatedResource(), container.getAllocatedNode(),
container.getAllocatedPriority(), container.getStartTime())));
}
@SuppressWarnings("unchecked")
public void containerFinished(RMContainer container) {
dispatcher.getEventHandler().handle(
new WritingContainerFinishEvent(container.getContainerId(),
ContainerFinishData.newInstance(container.getContainerId(),
container.getFinishTime(), container.getDiagnosticsInfo(),
container.getLogURL(), container.getContainerExitStatus(),
container.getContainerState())));
}
/**
* EventHandler implementation which forward events to HistoryWriter Making
* use of it, HistoryWriter can avoid to have a public handle method
*/
private final class ForwardingEventHandler
implements EventHandler<WritingApplicationHistoryEvent> {
@Override
public void handle(WritingApplicationHistoryEvent event) {
handleWritingApplicationHistoryEvent(event);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected static class MultiThreadedDispatcher
extends CompositeService implements Dispatcher {
private List<AsyncDispatcher> dispatchers =
new ArrayList<AsyncDispatcher>();
public MultiThreadedDispatcher(int num) {
super(MultiThreadedDispatcher.class.getName());
for (int i = 0; i < num; ++i) {
AsyncDispatcher dispatcher = createDispatcher();
dispatchers.add(dispatcher);
addIfService(dispatcher);
}
}
@Override
public EventHandler getEventHandler() {
return new CompositEventHandler();
}
@Override
public void register(Class<? extends Enum> eventType, EventHandler handler) {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.register(eventType, handler);
}
}
public void setDrainEventsOnStop() {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.setDrainEventsOnStop();
}
}
private class CompositEventHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
// Use hashCode (of ApplicationId) to dispatch the event to the child
// dispatcher, such that all the writing events of one application will
// be handled by one thread, the scheduled order of the these events
// will be preserved
int index = Math.abs(event.hashCode()) % dispatchers.size();
dispatchers.get(index).getEventHandler().handle(event);
}
}
protected AsyncDispatcher createDispatcher() {
return new AsyncDispatcher();
}
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
public class WritingApplicationAttemptFinishEvent extends
WritingApplicationHistoryEvent {
private ApplicationAttemptId appAttemptId;
private ApplicationAttemptFinishData appAttemptFinish;
public WritingApplicationAttemptFinishEvent(
ApplicationAttemptId appAttemptId,
ApplicationAttemptFinishData appAttemptFinish) {
super(WritingHistoryEventType.APP_ATTEMPT_FINISH);
this.appAttemptId = appAttemptId;
this.appAttemptFinish = appAttemptFinish;
}
@Override
public int hashCode() {
return appAttemptId.getApplicationId().hashCode();
}
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId;
}
public ApplicationAttemptFinishData getApplicationAttemptFinishData() {
return appAttemptFinish;
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
public class WritingApplicationAttemptStartEvent extends
WritingApplicationHistoryEvent {
private ApplicationAttemptId appAttemptId;
private ApplicationAttemptStartData appAttemptStart;
public WritingApplicationAttemptStartEvent(ApplicationAttemptId appAttemptId,
ApplicationAttemptStartData appAttemptStart) {
super(WritingHistoryEventType.APP_ATTEMPT_START);
this.appAttemptId = appAttemptId;
this.appAttemptStart = appAttemptStart;
}
@Override
public int hashCode() {
return appAttemptId.getApplicationId().hashCode();
}
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId;
}
public ApplicationAttemptStartData getApplicationAttemptStartData() {
return appAttemptStart;
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
public class WritingApplicationFinishEvent extends
WritingApplicationHistoryEvent {
private ApplicationId appId;
private ApplicationFinishData appFinish;
public WritingApplicationFinishEvent(ApplicationId appId,
ApplicationFinishData appFinish) {
super(WritingHistoryEventType.APP_FINISH);
this.appId = appId;
this.appFinish = appFinish;
}
@Override
public int hashCode() {
return appId.hashCode();
}
public ApplicationId getApplicationId() {
return appId;
}
public ApplicationFinishData getApplicationFinishData() {
return appFinish;
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class WritingApplicationHistoryEvent extends
AbstractEvent<WritingHistoryEventType> {
public WritingApplicationHistoryEvent(WritingHistoryEventType type) {
super(type);
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
public class WritingApplicationStartEvent extends
WritingApplicationHistoryEvent {
private ApplicationId appId;
private ApplicationStartData appStart;
public WritingApplicationStartEvent(ApplicationId appId,
ApplicationStartData appStart) {
super(WritingHistoryEventType.APP_START);
this.appId = appId;
this.appStart = appStart;
}
@Override
public int hashCode() {
return appId.hashCode();
}
public ApplicationId getApplicationId() {
return appId;
}
public ApplicationStartData getApplicationStartData() {
return appStart;
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
public class WritingContainerFinishEvent extends WritingApplicationHistoryEvent {
private ContainerId containerId;
private ContainerFinishData containerFinish;
public WritingContainerFinishEvent(ContainerId containerId,
ContainerFinishData containerFinish) {
super(WritingHistoryEventType.CONTAINER_FINISH);
this.containerId = containerId;
this.containerFinish = containerFinish;
}
@Override
public int hashCode() {
return containerId.getApplicationAttemptId().getApplicationId().hashCode();
}
public ContainerId getContainerId() {
return containerId;
}
public ContainerFinishData getContainerFinishData() {
return containerFinish;
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
public class WritingContainerStartEvent extends WritingApplicationHistoryEvent {
private ContainerId containerId;
private ContainerStartData containerStart;
public WritingContainerStartEvent(ContainerId containerId,
ContainerStartData containerStart) {
super(WritingHistoryEventType.CONTAINER_START);
this.containerId = containerId;
this.containerStart = containerStart;
}
@Override
public int hashCode() {
return containerId.getApplicationAttemptId().getApplicationId().hashCode();
}
public ContainerId getContainerId() {
return containerId;
}
public ContainerStartData getContainerStartData() {
return containerStart;
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
public enum WritingHistoryEventType {
APP_START,
APP_FINISH,
APP_ATTEMPT_START,
APP_ATTEMPT_FINISH,
CONTAINER_START,
CONTAINER_FINISH
}

View File

@ -336,6 +336,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.writeLock = lock.writeLock();
this.stateMachine = stateMachineFactory.make(this);
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
}
@Override
@ -1003,6 +1005,11 @@ public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
// TODO: We need to fix for the problem that RMApp enters the final state
// after RMAppAttempt in the killing case
app.rmContext.getRMApplicationHistoryWriter()
.applicationFinished(app);
};
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@ -178,4 +179,21 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
* @return the start time of the application.
*/
long getStartTime();
/**
* The current state of the {@link RMAppAttempt}.
*
* @return the current state {@link RMAppAttemptState} for this application
* attempt.
*/
RMAppAttemptState getState();
/**
* Create the external user-facing state of the attempt of ApplicationMaster
* from the current state of the {@link RMAppAttempt}.
*
* @return the external user-facing state of the attempt ApplicationMaster.
*/
YarnApplicationAttemptState createApplicationAttemptState();
}

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -62,6 +63,7 @@
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -1046,6 +1048,9 @@ public void transition(RMAppAttemptImpl appAttempt,
appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
appAttemptId, finalAttemptState, keepContainersAcrossAppAttempts));
appAttempt.removeCredentials(appAttempt);
appAttempt.rmContext.getRMApplicationHistoryWriter()
.applicationAttemptFinished(appAttempt);
}
}
@ -1143,6 +1148,9 @@ public void transition(RMAppAttemptImpl appAttempt,
// write at AM launch time, so we don't save the AM's tracking URL anywhere
// as that would mean an extra state-store write. For now, we hope that in
// work-preserving restart, AMs are forced to reregister.
appAttempt.rmContext.getRMApplicationHistoryWriter()
.applicationAttemptStarted(appAttempt);
}
}
@ -1514,6 +1522,23 @@ public long getStartTime() {
}
}
@Override
public RMAppAttemptState getState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
@Override
public YarnApplicationAttemptState createApplicationAttemptState() {
RMAppAttemptState state = getState();
return RMServerUtils.createApplicationAttemptState(state);
}
private void launchAttempt(){
// Send event to launch the AM Container
eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@ -139,6 +140,7 @@ RMContainerEventType.RELEASED, new KillTransition())
private final ApplicationAttemptId appAttemptId;
private final NodeId nodeId;
private final Container container;
private final RMContext rmContext;
private final EventHandler eventHandler;
private final ContainerAllocationExpirer containerAllocationExpirer;
private final String user;
@ -151,24 +153,26 @@ RMContainerEventType.RELEASED, new KillTransition())
private String logURL;
private ContainerStatus finishedStatus;
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId,
EventHandler handler,
ContainerAllocationExpirer containerAllocationExpirer,
String user) {
String user, RMContext rmContext) {
this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId();
this.nodeId = nodeId;
this.container = container;
this.appAttemptId = appAttemptId;
this.eventHandler = handler;
this.containerAllocationExpirer = containerAllocationExpirer;
this.user = user;
this.startTime = System.currentTimeMillis();
this.rmContext = rmContext;
this.eventHandler = rmContext.getDispatcher().getEventHandler();
this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
rmContext.getRMApplicationHistoryWriter().containerStarted(this);
}
@Override
@ -386,6 +390,9 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
// Inform AppAttempt
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
container.rmContext.getRMApplicationHistoryWriter()
.containerFinished(container);
}
}

View File

@ -233,9 +233,7 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
if (rmContainer == null) {
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
rmContext.getContainerAllocationExpirer(),
appSchedulingInfo.getUser());
node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
Resources.addTo(currentReservation, container.getResource());

View File

@ -121,9 +121,8 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, this
.getApplicationAttemptId(), node.getNodeID(), this.rmContext
.getDispatcher().getEventHandler(), this.rmContext
.getContainerAllocationExpirer(), appSchedulingInfo.getUser());
.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);

View File

@ -271,9 +271,8 @@ else if (allowed.equals(NodeType.RACK_LOCAL) &&
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(), rmContext
.getDispatcher().getEventHandler(), rmContext
.getContainerAllocationExpirer(), appSchedulingInfo.getUser());
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);

View File

@ -22,9 +22,9 @@
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.util.HashMap;
import java.util.List;
@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -104,9 +105,10 @@ public static RMContext mockRMContext(int n, long time) {
rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
rmDispatcher);
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext context = new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null, null) {
null, null, null, null, null, writer) {
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return map;

View File

@ -19,21 +19,21 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -51,9 +51,9 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
@ -81,6 +81,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -599,6 +600,8 @@ private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
.thenReturn(queInfo);
when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean()))
.thenThrow(new IOException("queue does not exist"));
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
yarnScheduler);
when(rmContext.getRMApps()).thenReturn(apps);

View File

@ -101,7 +101,7 @@ public void setUp() throws Exception {
rmContext =
new RMContextImpl(rmDispatcher, null, null, null,
mock(DelegationTokenRenewer.class), null, null, null, null);
mock(DelegationTokenRenewer.class), null, null, null, null, null);
scheduler = mock(YarnScheduler.class);
doAnswer(
new Answer<Void>() {

View File

@ -0,0 +1,509 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestRMApplicationHistoryWriter {
private static int MAX_RETRIES = 10;
private RMApplicationHistoryWriter writer;
private ApplicationHistoryStore store;
private List<CounterDispatcher> dispatchers =
new ArrayList<CounterDispatcher>();
@Before
public void setup() {
store = new MemoryApplicationHistoryStore();
Configuration conf = new Configuration();
writer = new RMApplicationHistoryWriter() {
@Override
protected ApplicationHistoryStore createApplicationHistoryStore(
Configuration conf) {
return store;
}
@Override
protected Dispatcher createDispatcher(Configuration conf) {
MultiThreadedDispatcher dispatcher = new MultiThreadedDispatcher(conf.getInt(
YarnConfiguration.RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE,
YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE));
dispatcher.setDrainEventsOnStop();
return dispatcher;
}
class MultiThreadedDispatcher extends
RMApplicationHistoryWriter.MultiThreadedDispatcher {
public MultiThreadedDispatcher(int num) {
super(num);
}
@Override
protected AsyncDispatcher createDispatcher() {
CounterDispatcher dispatcher = new CounterDispatcher();
dispatchers.add(dispatcher);
return dispatcher;
}
}
};
writer.init(conf);
writer.start();
}
@After
public void tearDown() {
writer.stop();
}
private static RMApp createRMApp(ApplicationId appId) {
RMApp app = mock(RMApp.class);
when(app.getApplicationId()).thenReturn(appId);
when(app.getName()).thenReturn("test app");
when(app.getApplicationType()).thenReturn("test app type");
when(app.getUser()).thenReturn("test user");
when(app.getQueue()).thenReturn("test queue");
when(app.getSubmitTime()).thenReturn(0L);
when(app.getStartTime()).thenReturn(1L);
when(app.getFinishTime()).thenReturn(2L);
when(app.getDiagnostics()).thenReturn(
new StringBuilder("test diagnostics info"));
when(app.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
when(app.createApplicationState())
.thenReturn(YarnApplicationState.FINISHED);
return app;
}
private static RMAppAttempt createRMAppAttempt(
ApplicationAttemptId appAttemptId) {
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
when(appAttempt.getHost()).thenReturn("test host");
when(appAttempt.getRpcPort()).thenReturn(-100);
Container container = mock(Container.class);
when(container.getId()).thenReturn(
ContainerId.newInstance(appAttemptId, 1));
when(appAttempt.getMasterContainer()).thenReturn(container);
when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
when(appAttempt.getTrackingUrl()).thenReturn("test url");
when(appAttempt.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
when(appAttempt.createApplicationAttemptState()).thenReturn(
YarnApplicationAttemptState.FINISHED);
return appAttempt;
}
private static RMContainer createRMContainer(
ContainerId containerId) {
RMContainer container = mock(RMContainer.class);
when(container.getContainerId()).thenReturn(containerId);
when(container.getAllocatedNode()).thenReturn(
NodeId.newInstance("test host", -100));
when(container.getAllocatedResource()).thenReturn(
Resource.newInstance(-1, -1));
when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
when(container.getStartTime()).thenReturn(0L);
when(container.getFinishTime()).thenReturn(1L);
when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
when(container.getLogURL()).thenReturn("test log url");
when(container.getContainerExitStatus()).thenReturn(-1);
when(container.getContainerState()).thenReturn(ContainerState.COMPLETE);
return container;
}
@Test
public void testWriteApplication() throws Exception {
RMApp app = createRMApp(ApplicationId.newInstance(0, 1));
writer.applicationStarted(app);
ApplicationHistoryData appHD = null;
for (int i = 0; i < MAX_RETRIES; ++i) {
appHD = store.getApplication(ApplicationId.newInstance(0, 1));
if (appHD != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertNotNull(appHD);
Assert.assertEquals("test app", appHD.getApplicationName());
Assert.assertEquals("test app type", appHD.getApplicationType());
Assert.assertEquals("test user", appHD.getUser());
Assert.assertEquals("test queue", appHD.getQueue());
Assert.assertEquals(0L, appHD.getSubmitTime());
Assert.assertEquals(1L, appHD.getStartTime());
writer.applicationFinished(app);
for (int i = 0; i < MAX_RETRIES; ++i) {
appHD = store.getApplication(ApplicationId.newInstance(0, 1));
if (appHD.getYarnApplicationState() != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertEquals(2L, appHD.getFinishTime());
Assert.assertEquals("test diagnostics info", appHD.getDiagnosticsInfo());
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
appHD.getFinalApplicationStatus());
Assert.assertEquals(YarnApplicationState.FINISHED,
appHD.getYarnApplicationState());
}
@Test
public void testWriteApplicationAttempt() throws Exception {
RMAppAttempt appAttempt = createRMAppAttempt(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1));
writer.applicationAttemptStarted(appAttempt);
ApplicationAttemptHistoryData appAttemptHD = null;
for (int i = 0; i < MAX_RETRIES; ++i) {
appAttemptHD =
store.getApplicationAttempt(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1));
if (appAttemptHD != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertNotNull(appAttemptHD);
Assert.assertEquals("test host", appAttemptHD.getHost());
Assert.assertEquals(-100, appAttemptHD.getRPCPort());
Assert.assertEquals(
ContainerId.newInstance(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1), 1),
appAttemptHD.getMasterContainerId());
writer.applicationAttemptFinished(appAttempt);
for (int i = 0; i < MAX_RETRIES; ++i) {
appAttemptHD =
store.getApplicationAttempt(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1));
if (appAttemptHD.getYarnApplicationAttemptState() != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertEquals("test diagnostics info",
appAttemptHD.getDiagnosticsInfo());
Assert.assertEquals("test url", appAttemptHD.getTrackingURL());
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
appAttemptHD.getFinalApplicationStatus());
Assert.assertEquals(YarnApplicationAttemptState.FINISHED,
appAttemptHD.getYarnApplicationAttemptState());
}
@Test
public void testWriteContainer() throws Exception {
RMContainer container = createRMContainer(
ContainerId.newInstance(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1), 1));
writer.containerStarted(container);
ContainerHistoryData containerHD = null;
for (int i = 0; i < MAX_RETRIES; ++i) {
containerHD =
store.getContainer(ContainerId.newInstance(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1), 1));
if (containerHD != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertNotNull(containerHD);
Assert.assertEquals(NodeId.newInstance("test host", -100),
containerHD.getAssignedNode());
Assert.assertEquals(Resource.newInstance(-1, -1),
containerHD.getAllocatedResource());
Assert.assertEquals(Priority.UNDEFINED, containerHD.getPriority());
Assert.assertEquals(0L, container.getStartTime());
writer.containerFinished(container);
for (int i = 0; i < MAX_RETRIES; ++i) {
containerHD =
store.getContainer(ContainerId.newInstance(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1), 1));
if (containerHD.getContainerState() != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertEquals("test diagnostics info",
containerHD.getDiagnosticsInfo());
Assert.assertEquals("test log url", containerHD.getLogURL());
Assert.assertEquals(-1, containerHD.getContainerExitStatus());
Assert.assertEquals(ContainerState.COMPLETE,
containerHD.getContainerState());
}
@Test
public void testParallelWrite() throws Exception {
List<ApplicationId> appIds = new ArrayList<ApplicationId>();
for (int i = 0; i < 10; ++i) {
Random rand = new Random(i);
ApplicationId appId = ApplicationId.newInstance(0, rand.nextInt());
appIds.add(appId);
RMApp app = createRMApp(appId);
writer.applicationStarted(app);
for (int j = 1; j <= 10; ++j) {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, j);
RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
writer.applicationAttemptStarted(appAttempt);
for (int k = 1; k <= 10; ++k) {
ContainerId containerId = ContainerId.newInstance(appAttemptId, k);
RMContainer container = createRMContainer(containerId);
writer.containerStarted(container);
writer.containerFinished(container);
}
writer.applicationAttemptFinished(appAttempt);
}
writer.applicationFinished(app);
}
for (int i = 0; i < MAX_RETRIES; ++i) {
if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) {
break;
} else {
Thread.sleep(500);
}
}
Assert.assertTrue(allEventsHandled(20 * 10 * 10 + 20 * 10 + 20));
// Validate all events of one application are handled by one dispatcher
for (ApplicationId appId : appIds) {
Assert.assertTrue(handledByOne(appId));
}
}
private boolean allEventsHandled(int expected) {
int actual = 0;
for (CounterDispatcher dispatcher : dispatchers) {
for (Integer count : dispatcher.counts.values()) {
actual += count;
}
}
return actual == expected;
}
@Test
public void testRMWritingMassiveHistory() throws Exception {
// 1. Show RM can run with writing history data
// 2. Test additional workload of processing history events
YarnConfiguration conf = new YarnConfiguration();
// don't process history events
MockRM rm = new MockRM(conf) {
@Override
protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
return new RMApplicationHistoryWriter() {
@Override
public void applicationStarted(RMApp app) {
}
@Override
public void applicationFinished(RMApp app) {
}
@Override
public void applicationAttemptStarted(RMAppAttempt appAttempt) {
}
@Override
public void applicationAttemptFinished(RMAppAttempt appAttempt) {
}
@Override
public void containerStarted(RMContainer container) {
}
@Override
public void containerFinished(RMContainer container) {
}
};
}
};
long startTime1 = System.currentTimeMillis();
testRMWritingMassiveHistory(rm);
long finishTime1 = System.currentTimeMillis();
long elapsedTime1 = finishTime1 - startTime1;
rm = new MockRM(conf);
long startTime2 = System.currentTimeMillis();
testRMWritingMassiveHistory(rm);
long finishTime2 = System.currentTimeMillis();
long elapsedTime2 = finishTime2 - startTime2;
// No more than 10% additional workload
// Should be much less, but computation time is fluctuated
Assert.assertTrue(elapsedTime2 - elapsedTime1 < elapsedTime1 / 10);
}
private void testRMWritingMassiveHistory(MockRM rm) throws Exception {
rm.start();
MockNM nm = rm.registerNode("127.0.0.1:1234", 1024 * 10100);
RMApp app = rm.submitApp(1024);
nm.nodeHeartbeat(true);
RMAppAttempt attempt = app.getCurrentAppAttempt();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
int request = 10000;
am.allocate("127.0.0.1" , 1024, request,
new ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
List<Container> allocated = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
int waitCount = 0;
int allocatedSize = allocated.size();
while (allocatedSize < request && waitCount++ < 200) {
Thread.sleep(100);
allocated = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
allocatedSize += allocated.size();
nm.nodeHeartbeat(true);
}
Assert.assertEquals(request, allocatedSize);
am.unregisterAppAttempt();
am.waitForState(RMAppAttemptState.FINISHING);
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
NodeHeartbeatResponse resp = nm.nodeHeartbeat(true);
List<ContainerId> cleaned = resp.getContainersToCleanup();
int cleanedSize = cleaned.size();
waitCount = 0;
while (cleanedSize < allocatedSize && waitCount++ < 200) {
Thread.sleep(100);
resp = nm.nodeHeartbeat(true);
cleaned = resp.getContainersToCleanup();
cleanedSize += cleaned.size();
}
Assert.assertEquals(allocatedSize, cleanedSize);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
rm.stop();
}
private boolean handledByOne(ApplicationId appId) {
int count = 0;
for (CounterDispatcher dispatcher : dispatchers) {
if (dispatcher.counts.containsKey(appId)) {
++count;
}
}
return count == 1;
}
private static class CounterDispatcher extends AsyncDispatcher {
private Map<ApplicationId, Integer> counts =
new HashMap<ApplicationId, Integer>();
@SuppressWarnings("rawtypes")
@Override
protected void dispatch(Event event) {
if (event instanceof WritingApplicationHistoryEvent) {
WritingApplicationHistoryEvent ashEvent =
(WritingApplicationHistoryEvent) event;
switch (ashEvent.getType()) {
case APP_START:
incrementCounts(((WritingApplicationStartEvent) event).getApplicationId());
break;
case APP_FINISH:
incrementCounts(((WritingApplicationFinishEvent) event)
.getApplicationId());
break;
case APP_ATTEMPT_START:
incrementCounts(((WritingApplicationAttemptStartEvent) event)
.getApplicationAttemptId().getApplicationId());
break;
case APP_ATTEMPT_FINISH:
incrementCounts(((WritingApplicationAttemptFinishEvent) event)
.getApplicationAttemptId().getApplicationId());
break;
case CONTAINER_START:
incrementCounts(((WritingContainerStartEvent) event).getContainerId()
.getApplicationAttemptId().getApplicationId());
break;
case CONTAINER_FINISH:
incrementCounts(((WritingContainerFinishEvent) event).getContainerId()
.getApplicationAttemptId().getApplicationId());
break;
}
}
super.dispatch(event);
}
private void incrementCounts(ApplicationId appId) {
Integer val = counts.get(appId);
if (val == null) {
counts.put(appId, 1);
} else {
counts.put(appId, val + 1);
}
}
}
}

View File

@ -72,7 +72,7 @@ public void setUp() {
// Dispatcher that processes events inline
Dispatcher dispatcher = new InlineDispatcher();
RMContext context = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null);
null, null, null, null, null, null, null, null);
dispatcher.register(SchedulerEventType.class,
new InlineDispatcher.EmptyEventHandler());
dispatcher.register(RMNodeEventType.class,

View File

@ -71,7 +71,7 @@ public void setUp() {
new TestRMNodeEventDispatcher());
RMContext context = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null);
null, null, null, null, null, null, null, null);
dispatcher.register(SchedulerEventType.class,
new InlineDispatcher.EmptyEventHandler());
dispatcher.register(RMNodeEventType.class,

View File

@ -71,7 +71,7 @@ public void handle(Event event) {
RMContext context =
new RMContextImpl(dispatcher, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf), null);
new NMTokenSecretManagerInRM(conf), null, null);
dispatcher.register(RMNodeEventType.class,
new ResourceManager.NodeEventDispatcher(context));
NodesListManager nodesListManager = new NodesListManager(context);

View File

@ -20,6 +20,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -49,6 +50,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@ -83,6 +85,7 @@ public class TestRMAppTransitions {
private static int appId = 1;
private DrainDispatcher rmDispatcher;
private RMStateStore store;
private RMApplicationHistoryWriter writer;
private YarnScheduler scheduler;
// ignore all the RM application attempt events
@ -178,13 +181,15 @@ public void setUp() throws Exception {
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
store = mock(RMStateStore.class);
writer = mock(RMApplicationHistoryWriter.class);
this.rmContext =
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM());
new ClientToAMTokenSecretManagerInRM(),
writer);
((RMContextImpl)rmContext).setStateStore(store);
rmDispatcher.register(RMAppAttemptEventType.class,
@ -335,6 +340,7 @@ private void sendAttemptUpdateSavedEvent(RMApp application) {
protected RMApp testCreateAppNewSaving(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext);
verify(writer).applicationStarted(any(RMApp.class));
// NEW => NEW_SAVING event RMAppEventType.START
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
@ -456,6 +462,9 @@ public void testUnmanagedApp() throws IOException {
Assert.assertTrue("Finished app missing diagnostics",
application.getDiagnostics().indexOf(diagMsg) != -1);
// reset the counter of Mockito.verify
reset(writer);
// test app fails after 1 app attempt failure
LOG.info("--- START: testUnmanagedAppFailPath ---");
application = testCreateAppRunning(subContext);
@ -497,6 +506,7 @@ public void testAppNewKill() throws IOException {
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
assertKilled(application);
verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@ -512,6 +522,7 @@ public void testAppNewReject() throws IOException {
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
assertFailed(application, rejectedText);
verify(writer).applicationFinished(any(RMApp.class));
}
@Test (timeout = 30000)
@ -526,6 +537,7 @@ public void testAppNewSavingKill() throws IOException {
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
assertKilled(application);
verify(writer).applicationFinished(any(RMApp.class));
}
@Test (timeout = 30000)
@ -541,6 +553,7 @@ public void testAppNewSavingReject() throws IOException {
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
assertFailed(application, rejectedText);
verify(writer).applicationFinished(any(RMApp.class));
}
@Test (timeout = 30000)
@ -556,6 +569,7 @@ public void testAppSubmittedRejected() throws IOException {
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
assertFailed(application, rejectedText);
verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@ -570,6 +584,7 @@ public void testAppSubmittedKill() throws IOException, InterruptedException {
sendAppUpdateSavedEvent(application);
assertKilled(application);
assertAppFinalStateSaved(application);
verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@ -603,6 +618,7 @@ public void testAppAcceptedFailed() throws IOException {
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
assertFailed(application, ".*" + message + ".*Failing the application.*");
verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@ -617,6 +633,7 @@ public void testAppAcceptedKill() throws IOException, InterruptedException {
sendAppUpdateSavedEvent(application);
assertKilled(application);
assertAppFinalStateSaved(application);
verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@ -639,6 +656,7 @@ public void testAppRunningKill() throws IOException {
sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertKilled(application);
verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@ -691,6 +709,7 @@ public void testAppRunningFailed() throws IOException {
application.handle(event);
rmDispatcher.await();
assertFailed(application, ".*Failing the application.*");
verify(writer).applicationFinished(any(RMApp.class));
}
@Test
@ -748,6 +767,7 @@ public void testAppFinishedFinished() throws IOException {
StringBuilder diag = application.getDiagnostics();
Assert.assertEquals("application diagnostics is not correct",
"", diag.toString());
verify(writer).applicationFinished(any(RMApp.class));
}
@Test (timeout = 30000)
@ -775,6 +795,7 @@ public void testAppFailedFailed() throws IOException {
assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application);
verify(writer).applicationFinished(any(RMApp.class));
}
@Test (timeout = 30000)
@ -820,6 +841,7 @@ public void testAppKilledKilled() throws IOException {
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
verify(writer).applicationFinished(any(RMApp.class));
}
@Test

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@ -119,6 +120,8 @@ public class TestRMAppAttemptTransitions {
private ApplicationMasterLauncher applicationMasterLauncher;
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMApplicationHistoryWriter writer;
private RMStateStore store;
private RMAppImpl application;
@ -213,13 +216,15 @@ public void setUp() throws Exception {
mock(ContainerAllocationExpirer.class);
amLivelinessMonitor = mock(AMLivelinessMonitor.class);
amFinishingMonitor = mock(AMLivelinessMonitor.class);
writer = mock(RMApplicationHistoryWriter.class);
rmContext =
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, amRMTokenManager,
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
clientToAMTokenManager);
clientToAMTokenManager,
writer);
store = mock(RMStateStore.class);
((RMContextImpl) rmContext).setStateStore(store);
@ -377,6 +382,7 @@ private void testAppAttemptKilledState(Container amContainer,
assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verify(writer).applicationAttemptFinished(any(RMAppAttempt.class));
verifyAttemptFinalStateSaved();
assertFalse(transferStateFromPreviousAttempt);
}
@ -452,6 +458,7 @@ private void testAppAttemptFailedState(Container container,
// Check events
verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verify(writer).applicationAttemptFinished(any(RMAppAttempt.class));
verifyAttemptFinalStateSaved();
}
@ -487,6 +494,7 @@ private void testAppAttemptRunningState(Container container,
assertEquals(getProxyUrl(applicationAttempt),
applicationAttempt.getTrackingUrl());
}
verify(writer).applicationAttemptStarted(any(RMAppAttempt.class));
// TODO - need to add more checks relevant to this state
}

View File

@ -19,9 +19,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -36,6 +39,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@ -48,12 +53,10 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestRMContainerImpl {
@SuppressWarnings("resource")
@Test
public void testReleaseWhileRunning() {
DrainDispatcher drainDispatcher = new DrainDispatcher();
EventHandler eventHandler = drainDispatcher.getEventHandler();
EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(EventHandler.class);
EventHandler generic = mock(EventHandler.class);
drainDispatcher.register(RMAppAttemptEventType.class,
@ -74,19 +77,24 @@ public void testReleaseWhileRunning() {
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, eventHandler, expirer, "user");
nodeId, "user", rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());
assertEquals(resource, rmContainer.getAllocatedResource());
assertEquals(nodeId, rmContainer.getAllocatedNode());
assertEquals(priority, rmContainer.getAllocatedPriority());
verify(writer).containerStarted(any(RMContainer.class));
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
drainDispatcher.await();
assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.ACQUIRED));
drainDispatcher.await();
@ -114,6 +122,7 @@ public void testReleaseWhileRunning() {
assertEquals(ContainerExitStatus.ABORTED,
rmContainer.getContainerExitStatus());
assertEquals(ContainerState.COMPLETE, rmContainer.getContainerState());
verify(writer).containerFinished(any(RMContainer.class));
ArgumentCaptor<RMAppAttemptContainerFinishedEvent> captor = ArgumentCaptor
.forClass(RMAppAttemptContainerFinishedEvent.class);
@ -130,12 +139,10 @@ public void testReleaseWhileRunning() {
assertEquals(RMContainerState.RELEASED, rmContainer.getState());
}
@SuppressWarnings("resource")
@Test
public void testExpireWhileRunning() {
DrainDispatcher drainDispatcher = new DrainDispatcher();
EventHandler eventHandler = drainDispatcher.getEventHandler();
EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(EventHandler.class);
EventHandler generic = mock(EventHandler.class);
drainDispatcher.register(RMAppAttemptEventType.class,
@ -156,13 +163,19 @@ public void testExpireWhileRunning() {
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, eventHandler, expirer, "user");
nodeId, "user", rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());
assertEquals(resource, rmContainer.getAllocatedResource());
assertEquals(nodeId, rmContainer.getAllocatedNode());
assertEquals(priority, rmContainer.getAllocatedPriority());
verify(writer).containerStarted(any(RMContainer.class));
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
@ -191,5 +204,6 @@ public void testExpireWhileRunning() {
containerStatus, RMContainerEventType.EXPIRE));
drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
verify(writer, never()).containerFinished(any(RMContainer.class));
}
}

View File

@ -348,7 +348,7 @@ public void testRefreshQueues() throws Exception {
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));
new ClientToAMTokenSecretManagerInRM(), null));
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
conf.setCapacity(A, 80f);
@ -447,7 +447,7 @@ public void testParseQueue() throws IOException {
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));
new ClientToAMTokenSecretManagerInRM(), null));
}
@Test
@ -460,7 +460,7 @@ public void testReconnectedNode() throws Exception {
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
null, null, new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM()));
new ClientToAMTokenSecretManagerInRM(), null));
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
@ -487,7 +487,7 @@ public void testRefreshQueuesWithNewQueue() throws Exception {
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));
new ClientToAMTokenSecretManagerInRM(), null));
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
// Add a new queue b4
@ -638,7 +638,7 @@ public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null,
null, null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));
new ClientToAMTokenSecretManagerInRM(), null));
SchedulerApplication app =
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(

View File

@ -41,8 +41,8 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -248,14 +248,18 @@ public void testSortedQueues() throws Exception {
ContainerAllocationExpirer expirer =
mock(ContainerAllocationExpirer.class);
DrainDispatcher drainDispatcher = new DrainDispatcher();
EventHandler eventHandler = drainDispatcher.getEventHandler();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
app_0.getApplicationId(), 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
Container container=TestUtils.getMockContainer(containerId,
node_0.getNodeID(), Resources.createResource(1*GB), priority);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
node_0.getNodeID(), eventHandler, expirer, "user");
node_0.getNodeID(), "user", rmContext);
// Assign {1,2,3,4} 1GB containers respectively to queues
stubQueueAllocation(a, clusterResource, node_0, 1*GB);

View File

@ -47,7 +47,7 @@ public void testQueueParsing() throws Exception {
capacityScheduler.reinitialize(conf, new RMContextImpl(null, null,
null, null, null, null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));
new ClientToAMTokenSecretManagerInRM(), null));
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -84,12 +85,13 @@ public EventHandler getEventHandler() {
new ContainerAllocationExpirer(nullDispatcher);
Configuration conf = new Configuration();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext =
new RMContextImpl(nullDispatcher, cae, null, null, null,
new AMRMTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM());
new ClientToAMTokenSecretManagerInRM(), writer);
return rmContext;
}

View File

@ -21,6 +21,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@ -56,6 +58,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -140,8 +143,9 @@ public void testFifoSchedulerCapacityWhenNoNMs() {
@Test(timeout=5000)
public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null);
null, null, null, null, null, null, null, writer);
FifoScheduler schedular = new FifoScheduler();
schedular.reinitialize(new Configuration(), rmContext);
@ -177,8 +181,9 @@ public void testNodeLocalAssignment() throws Exception {
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null);
null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
FifoScheduler scheduler = new FifoScheduler();
scheduler.reinitialize(new Configuration(), rmContext);
@ -241,8 +246,9 @@ public void testUpdateResourceOnNode() throws Exception {
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null);
null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
FifoScheduler scheduler = new FifoScheduler(){
@SuppressWarnings("unused")

View File

@ -163,7 +163,7 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes,
deactivatedNodesMap.put(node.getHostName(), node);
}
return new RMContextImpl(null, null, null, null,
null, null, null, null, null) {
null, null, null, null, null, null) {
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return applicationsMaps;
@ -206,7 +206,7 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException {
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM()));
new ClientToAMTokenSecretManagerInRM(), null));
return cs;
}