From 0f87e4928b547521a078805607a97c908285cde4 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 28 Jan 2014 19:56:28 +0000 Subject: [PATCH] YARN-953. Changed ResourceManager to start writing history data. Contributed by Zhijie Shen. svn merge --ignore-ancestry -c 1556738 ../YARN-321 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562192 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 16 + .../src/main/resources/yarn-default.xml | 23 +- .../NullApplicationHistoryStore.java | 127 +++++ .../impl/pb/ApplicationStartDataPBImpl.java | 2 +- .../pom.xml | 5 + .../server/resourcemanager/RMContext.java | 9 +- .../server/resourcemanager/RMContextImpl.java | 18 +- .../resourcemanager/ResourceManager.java | 10 + .../ahs/RMApplicationHistoryWriter.java | 334 ++++++++++++ .../WritingApplicationAttemptFinishEvent.java | 51 ++ .../WritingApplicationAttemptStartEvent.java | 50 ++ .../ahs/WritingApplicationFinishEvent.java | 50 ++ .../ahs/WritingApplicationHistoryEvent.java | 30 ++ .../ahs/WritingApplicationStartEvent.java | 50 ++ .../ahs/WritingContainerFinishEvent.java | 49 ++ .../ahs/WritingContainerStartEvent.java | 49 ++ .../ahs/WritingHistoryEventType.java | 29 + .../resourcemanager/rmapp/RMAppImpl.java | 7 + .../rmapp/attempt/RMAppAttempt.java | 18 + .../rmapp/attempt/RMAppAttemptImpl.java | 25 + .../rmcontainer/RMContainerImpl.java | 17 +- .../SchedulerApplicationAttempt.java | 4 +- .../common/fica/FiCaSchedulerApp.java | 5 +- .../scheduler/fair/FSSchedulerApp.java | 5 +- .../resourcemanager/TestAppManager.java | 6 +- .../resourcemanager/TestClientRMService.java | 11 +- .../TestRMNodeTransitions.java | 2 +- .../ahs/TestRMApplicationHistoryWriter.java | 509 ++++++++++++++++++ .../resourcetracker/TestNMExpiry.java | 2 +- .../resourcetracker/TestNMReconnect.java | 2 +- .../TestRMNMRPCResponseId.java | 2 +- .../rmapp/TestRMAppTransitions.java | 24 +- .../attempt/TestRMAppAttemptTransitions.java | 10 +- .../rmcontainer/TestRMContainerImpl.java | 28 +- .../capacity/TestCapacityScheduler.java | 10 +- .../capacity/TestChildQueueOrder.java | 10 +- .../scheduler/capacity/TestQueueParsing.java | 2 +- .../scheduler/capacity/TestUtils.java | 4 +- .../scheduler/fifo/TestFifoScheduler.java | 12 +- .../resourcemanager/webapp/TestRMWebApp.java | 4 +- 41 files changed, 1572 insertions(+), 52 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptFinishEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptStartEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationFinishEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationHistoryEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationStartEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerFinishEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerStartEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 143c68cb224..51d65a8a5b7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -493,6 +493,9 @@ Branch YARN-321: Generic ApplicationHistoryService YARN-987. Added ApplicationHistoryManager responsible for exposing reports to all clients. (Mayank Bansal via vinodkv) + YARN-953. Changed ResourceManager to start writing history data. (Zhijie Shen + via vinodkv) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 08b83ca29df..8f8411bf191 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -263,6 +263,22 @@ public class YarnConfiguration extends Configuration { RM_PREFIX + "nodemanagers.heartbeat-interval-ms"; public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000; + /** The setting that controls whether RM writes history data. */ + public static final String RM_HISTORY_WRITER_ENABLED = RM_PREFIX + + "history-writer.enabled"; + public static final boolean DEFAULT_RM_HISTORY_WRITER_ENABLED = false; + + /** Number of worker threads that write the history data. */ + public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE = + RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size"; + public static final int DEFAULT_RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE = + 10; + + /** The implementation class of ApplicationHistoryStore, which is to be used + * by RMApplicationHistoryWriter. */ + public static final String RM_HISTORY_WRITER_CLASS = RM_PREFIX + + "history-writer.class"; + //Delegation token related keys public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = RM_PREFIX + "delegation.key.update-interval"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index cb81926dfb7..2e531c5605e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -564,6 +564,27 @@ org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy + + Enable RM to write history data. If true, then + yarn.resourcemanager.history-writer.class must be specified + yarn.resourcemanager.history-writer.enabled + false + + + + Number of worker threads that write the history data. + yarn.resourcemanager.history-writer.multi-threaded-dispatcher.pool-size + 10 + + + + The implementation class of ApplicationHistoryStore, which is + to be used by RMApplicationHistoryWriter. + + yarn.resourcemanager.history-writer.class + org.apache.hadoop.yarn.server.applicationhistoryservice.NullApplicationHistoryStore + + The hostname of the NM. @@ -1047,7 +1068,7 @@ URI pointing to the location of the FileSystem path where the history will be persisted. This must be supplied when using org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore - as the value for yarn.resourcemanager.ahs.writer.class + as the value for yarn.resourcemanager.history-writer.store.class yarn.ahs.fs-history-store.uri ${hadoop.log.dir}/yarn/system/ahstore diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java new file mode 100644 index 00000000000..3660c10befd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; + +/** + * Dummy implementation of {@link ApplicationHistoryStore}. If this + * implementation is used, no history data will be persisted. + * + */ +@Unstable +@Private +public class NullApplicationHistoryStore extends AbstractService implements + ApplicationHistoryStore { + + public NullApplicationHistoryStore() { + super(NullApplicationHistoryStore.class.getName()); + } + + @Override + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + } + + @Override + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + } + + @Override + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) + throws IOException { + return null; + } + + @Override + public Map getAllApplications() + throws IOException { + return Collections.emptyMap(); + } + + @Override + public Map + getApplicationAttempts(ApplicationId appId) throws IOException { + return Collections.emptyMap(); + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + return null; + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) + throws IOException { + return null; + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + return null; + } + + @Override + public Map getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + return Collections.emptyMap(); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ApplicationStartDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ApplicationStartDataPBImpl.java index b08023ed09f..6629380b478 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ApplicationStartDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ApplicationStartDataPBImpl.java @@ -148,7 +148,7 @@ public void setQueue(String queue) { @Override public long getSubmitTime() { ApplicationStartDataProtoOrBuilder p = viaProto ? proto : builder; - return p.getStartTime(); + return p.getSubmitTime(); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 7bbe14e6aa7..96316aae2c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -183,6 +183,11 @@ hadoop-yarn-server-common + + org.apache.hadoop + hadoop-yarn-server-applicationhistoryservice + ${project.version} + org.apache.hadoop hadoop-yarn-server-web-proxy diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 1ddb1b48a52..64a4165feb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; @@ -33,8 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; /** @@ -90,4 +91,10 @@ public interface RMContext { void setRMDelegationTokenSecretManager( RMDelegationTokenSecretManager delegationTokenSecretManager); + + RMApplicationHistoryWriter getRMApplicationHistoryWriter(); + + void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter rmApplicationHistoryWriter); + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ec90b4a27d2..79e59831e9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -76,6 +77,7 @@ public class RMContextImpl implements RMContext { private NodesListManager nodesListManager; private ResourceTrackerService resourceTrackerService; private ApplicationMasterService applicationMasterService; + private RMApplicationHistoryWriter rmApplicationHistoryWriter; /** * Default constructor. To be used in conjunction with setter methods for @@ -95,7 +97,8 @@ public RMContextImpl(Dispatcher rmDispatcher, AMRMTokenSecretManager appTokenSecretManager, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, - ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, + RMApplicationHistoryWriter rmApplicationHistoryWriter) { this(); this.setDispatcher(rmDispatcher); this.setContainerAllocationExpirer(containerAllocationExpirer); @@ -106,6 +109,7 @@ public RMContextImpl(Dispatcher rmDispatcher, this.setContainerTokenSecretManager(containerTokenSecretManager); this.setNMTokenSecretManager(nmTokenSecretManager); this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); + this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); RMStateStore nullStore = new NullRMStateStore(); nullStore.setRMDispatcher(rmDispatcher); @@ -318,4 +322,16 @@ public HAServiceState getHAServiceState() { return haServiceState; } } + + @Override + public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { + return rmApplicationHistoryWriter; + } + + @Override + public void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; + } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index a98be13e3bb..8575cd57d65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; @@ -261,6 +262,10 @@ protected RMAppManager createRMAppManager() { this.applicationACLsManager, this.conf); } + protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { + return new RMApplicationHistoryWriter(); + } + // sanity check for configurations protected static void validateConfigs(Configuration conf) { // validate max-attempts @@ -345,6 +350,11 @@ protected void serviceInit(Configuration configuration) throws Exception { rmContext.setDelegationTokenRenewer(delegationTokenRenewer); } + RMApplicationHistoryWriter rmApplicationHistoryWriter = + createRMApplicationHistoryWriter(); + addService(rmApplicationHistoryWriter); + rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java new file mode 100644 index 00000000000..354fcc4c638 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.applicationhistoryservice.NullApplicationHistoryStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + *

+ * {@link ResourceManager} uses this class to write the information of + * {@link RMApp}, {@link RMAppAttempt} and {@link RMContainer}. These APIs are + * non-blocking, and just schedule a writing history event. An self-contained + * dispatcher vector will handle the event in separate threads, and extract the + * required fields that are going to be persisted. Then, the extracted + * information will be persisted via the implementation of + * {@link ApplicationHistoryStore}. + *

+ */ +@Private +@Unstable +public class RMApplicationHistoryWriter extends CompositeService { + + public static final Log LOG = + LogFactory.getLog(RMApplicationHistoryWriter.class); + + private Dispatcher dispatcher; + private ApplicationHistoryWriter writer; + + public RMApplicationHistoryWriter() { + super(RMApplicationHistoryWriter.class.getName()); + } + + @Override + protected synchronized void serviceInit( + Configuration conf) throws Exception { + writer = createApplicationHistoryStore(conf); + addIfService(writer); + + dispatcher = createDispatcher(conf); + dispatcher.register( + WritingHistoryEventType.class, new ForwardingEventHandler()); + addIfService(dispatcher); + super.serviceInit(conf); + } + + protected Dispatcher createDispatcher(Configuration conf) { + MultiThreadedDispatcher dispatcher = new MultiThreadedDispatcher(conf.getInt( + YarnConfiguration.RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE)); + dispatcher.setDrainEventsOnStop(); + return dispatcher; + } + + protected ApplicationHistoryStore createApplicationHistoryStore( + Configuration conf) { + boolean ahsEnabled = conf.getBoolean( + YarnConfiguration.RM_HISTORY_WRITER_ENABLED, + YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_ENABLED); + // If the history writer is not enabled, a dummy store will be used to + // write nothing + if (ahsEnabled) { + try { + Class storeClass = + conf.getClass(YarnConfiguration.RM_HISTORY_WRITER_CLASS, + NullApplicationHistoryStore.class, + ApplicationHistoryStore.class); + return storeClass.newInstance(); + } catch (Exception e) { + String msg = "Could not instantiate ApplicationHistoryWriter: " + + conf.get(YarnConfiguration.RM_HISTORY_WRITER_CLASS, + NullApplicationHistoryStore.class.getName()); + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + } else { + return new NullApplicationHistoryStore(); + } + } + + protected void handleWritingApplicationHistoryEvent( + WritingApplicationHistoryEvent event) { + switch (event.getType()) { + case APP_START: + WritingApplicationStartEvent wasEvent = + (WritingApplicationStartEvent) event; + try { + writer.applicationStarted(wasEvent.getApplicationStartData()); + LOG.info("Stored the start data of application " + + wasEvent.getApplicationId()); + } catch (IOException e) { + LOG.error("Error when storing the start data of application " + + wasEvent.getApplicationId()); + } + break; + case APP_FINISH: + WritingApplicationFinishEvent wafEvent = + (WritingApplicationFinishEvent) event; + try { + writer.applicationFinished(wafEvent.getApplicationFinishData()); + LOG.info("Stored the finish data of application " + + wafEvent.getApplicationId()); + } catch (IOException e) { + LOG.error("Error when storing the finish data of application " + + wafEvent.getApplicationId()); + } + break; + case APP_ATTEMPT_START: + WritingApplicationAttemptStartEvent waasEvent = + (WritingApplicationAttemptStartEvent) event; + try { + writer.applicationAttemptStarted(waasEvent + .getApplicationAttemptStartData()); + LOG.info("Stored the start data of application attempt " + + waasEvent.getApplicationAttemptId()); + } catch (IOException e) { + LOG.error("Error when storing the start data of application attempt " + + waasEvent.getApplicationAttemptId()); + } + break; + case APP_ATTEMPT_FINISH: + WritingApplicationAttemptFinishEvent waafEvent = + (WritingApplicationAttemptFinishEvent) event; + try { + writer.applicationAttemptFinished(waafEvent + .getApplicationAttemptFinishData()); + LOG.info("Stored the finish data of application attempt " + + waafEvent.getApplicationAttemptId()); + } catch (IOException e) { + LOG.error("Error when storing the finish data of application attempt " + + waafEvent.getApplicationAttemptId()); + } + break; + case CONTAINER_START: + WritingContainerStartEvent wcsEvent = + (WritingContainerStartEvent) event; + try { + writer.containerStarted(wcsEvent.getContainerStartData()); + LOG.info("Stored the start data of container " + + wcsEvent.getContainerId()); + } catch (IOException e) { + LOG.error("Error when storing the start data of container " + + wcsEvent.getContainerId()); + } + break; + case CONTAINER_FINISH: + WritingContainerFinishEvent wcfEvent = + (WritingContainerFinishEvent) event; + try { + writer.containerFinished(wcfEvent.getContainerFinishData()); + LOG.info("Stored the finish data of container " + + wcfEvent.getContainerId()); + } catch (IOException e) { + LOG.error("Error when storing the finish data of container " + + wcfEvent.getContainerId()); + } + break; + default: + LOG.error("Unknown WritingApplicationHistoryEvent type: " + + event.getType()); + } + } + + @SuppressWarnings("unchecked") + public void applicationStarted(RMApp app) { + dispatcher.getEventHandler().handle( + new WritingApplicationStartEvent(app.getApplicationId(), + ApplicationStartData.newInstance(app.getApplicationId(), + app.getName(), app.getApplicationType(), app.getQueue(), + app.getUser(), app.getSubmitTime(), app.getStartTime()))); + } + + @SuppressWarnings("unchecked") + public void applicationFinished(RMApp app) { + dispatcher.getEventHandler().handle( + new WritingApplicationFinishEvent(app.getApplicationId(), + ApplicationFinishData.newInstance(app.getApplicationId(), + app.getFinishTime(), + app.getDiagnostics().toString(), + app.getFinalApplicationStatus(), + app.createApplicationState()))); + } + + @SuppressWarnings("unchecked") + public void applicationAttemptStarted(RMAppAttempt appAttempt) { + dispatcher.getEventHandler().handle( + new WritingApplicationAttemptStartEvent(appAttempt.getAppAttemptId(), + ApplicationAttemptStartData.newInstance( + appAttempt.getAppAttemptId(), appAttempt.getHost(), + appAttempt.getRpcPort(), appAttempt.getMasterContainer() + .getId()))); + } + + @SuppressWarnings("unchecked") + public void applicationAttemptFinished(RMAppAttempt appAttempt) { + dispatcher.getEventHandler().handle( + new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(), + ApplicationAttemptFinishData.newInstance(appAttempt + .getAppAttemptId(), appAttempt.getDiagnostics().toString(), + appAttempt.getTrackingUrl(), appAttempt + .getFinalApplicationStatus(), appAttempt + .createApplicationAttemptState()))); + } + + @SuppressWarnings("unchecked") + public void containerStarted(RMContainer container) { + dispatcher.getEventHandler().handle( + new WritingContainerStartEvent(container.getContainerId(), + ContainerStartData.newInstance(container.getContainerId(), + container.getAllocatedResource(), container.getAllocatedNode(), + container.getAllocatedPriority(), container.getStartTime()))); + } + + @SuppressWarnings("unchecked") + public void containerFinished(RMContainer container) { + dispatcher.getEventHandler().handle( + new WritingContainerFinishEvent(container.getContainerId(), + ContainerFinishData.newInstance(container.getContainerId(), + container.getFinishTime(), container.getDiagnosticsInfo(), + container.getLogURL(), container.getContainerExitStatus(), + container.getContainerState()))); + } + + /** + * EventHandler implementation which forward events to HistoryWriter Making + * use of it, HistoryWriter can avoid to have a public handle method + */ + private final class ForwardingEventHandler + implements EventHandler { + + @Override + public void handle(WritingApplicationHistoryEvent event) { + handleWritingApplicationHistoryEvent(event); + } + + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + protected static class MultiThreadedDispatcher + extends CompositeService implements Dispatcher { + + private List dispatchers = + new ArrayList(); + + public MultiThreadedDispatcher(int num) { + super(MultiThreadedDispatcher.class.getName()); + for (int i = 0; i < num; ++i) { + AsyncDispatcher dispatcher = createDispatcher(); + dispatchers.add(dispatcher); + addIfService(dispatcher); + } + } + + @Override + public EventHandler getEventHandler() { + return new CompositEventHandler(); + } + + @Override + public void register(Class eventType, EventHandler handler) { + for (AsyncDispatcher dispatcher : dispatchers) { + dispatcher.register(eventType, handler); + } + } + + public void setDrainEventsOnStop() { + for (AsyncDispatcher dispatcher : dispatchers) { + dispatcher.setDrainEventsOnStop(); + } + } + + private class CompositEventHandler implements EventHandler { + + @Override + public void handle(Event event) { + // Use hashCode (of ApplicationId) to dispatch the event to the child + // dispatcher, such that all the writing events of one application will + // be handled by one thread, the scheduled order of the these events + // will be preserved + int index = Math.abs(event.hashCode()) % dispatchers.size(); + dispatchers.get(index).getEventHandler().handle(event); + } + + } + + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher(); + } + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptFinishEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptFinishEvent.java new file mode 100644 index 00000000000..3f6a6203555 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptFinishEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; + +public class WritingApplicationAttemptFinishEvent extends + WritingApplicationHistoryEvent { + + private ApplicationAttemptId appAttemptId; + private ApplicationAttemptFinishData appAttemptFinish; + + public WritingApplicationAttemptFinishEvent( + ApplicationAttemptId appAttemptId, + ApplicationAttemptFinishData appAttemptFinish) { + super(WritingHistoryEventType.APP_ATTEMPT_FINISH); + this.appAttemptId = appAttemptId; + this.appAttemptFinish = appAttemptFinish; + } + + @Override + public int hashCode() { + return appAttemptId.getApplicationId().hashCode(); + } + + public ApplicationAttemptId getApplicationAttemptId() { + return appAttemptId; + } + + public ApplicationAttemptFinishData getApplicationAttemptFinishData() { + return appAttemptFinish; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptStartEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptStartEvent.java new file mode 100644 index 00000000000..7e092d3455b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptStartEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; + +public class WritingApplicationAttemptStartEvent extends + WritingApplicationHistoryEvent { + + private ApplicationAttemptId appAttemptId; + private ApplicationAttemptStartData appAttemptStart; + + public WritingApplicationAttemptStartEvent(ApplicationAttemptId appAttemptId, + ApplicationAttemptStartData appAttemptStart) { + super(WritingHistoryEventType.APP_ATTEMPT_START); + this.appAttemptId = appAttemptId; + this.appAttemptStart = appAttemptStart; + } + + @Override + public int hashCode() { + return appAttemptId.getApplicationId().hashCode(); + } + + public ApplicationAttemptId getApplicationAttemptId() { + return appAttemptId; + } + + public ApplicationAttemptStartData getApplicationAttemptStartData() { + return appAttemptStart; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationFinishEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationFinishEvent.java new file mode 100644 index 00000000000..7a202144de1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationFinishEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; + +public class WritingApplicationFinishEvent extends + WritingApplicationHistoryEvent { + + private ApplicationId appId; + private ApplicationFinishData appFinish; + + public WritingApplicationFinishEvent(ApplicationId appId, + ApplicationFinishData appFinish) { + super(WritingHistoryEventType.APP_FINISH); + this.appId = appId; + this.appFinish = appFinish; + } + + @Override + public int hashCode() { + return appId.hashCode(); + } + + public ApplicationId getApplicationId() { + return appId; + } + + public ApplicationFinishData getApplicationFinishData() { + return appFinish; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationHistoryEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationHistoryEvent.java new file mode 100644 index 00000000000..bc17edc62b4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationHistoryEvent.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class WritingApplicationHistoryEvent extends + AbstractEvent { + + public WritingApplicationHistoryEvent(WritingHistoryEventType type) { + super(type); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationStartEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationStartEvent.java new file mode 100644 index 00000000000..1b5dc784c3b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationStartEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; + +public class WritingApplicationStartEvent extends + WritingApplicationHistoryEvent { + + private ApplicationId appId; + private ApplicationStartData appStart; + + public WritingApplicationStartEvent(ApplicationId appId, + ApplicationStartData appStart) { + super(WritingHistoryEventType.APP_START); + this.appId = appId; + this.appStart = appStart; + } + + @Override + public int hashCode() { + return appId.hashCode(); + } + + public ApplicationId getApplicationId() { + return appId; + } + + public ApplicationStartData getApplicationStartData() { + return appStart; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerFinishEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerFinishEvent.java new file mode 100644 index 00000000000..6b271669f56 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerFinishEvent.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; + +public class WritingContainerFinishEvent extends WritingApplicationHistoryEvent { + + private ContainerId containerId; + private ContainerFinishData containerFinish; + + public WritingContainerFinishEvent(ContainerId containerId, + ContainerFinishData containerFinish) { + super(WritingHistoryEventType.CONTAINER_FINISH); + this.containerId = containerId; + this.containerFinish = containerFinish; + } + + @Override + public int hashCode() { + return containerId.getApplicationAttemptId().getApplicationId().hashCode(); + } + + public ContainerId getContainerId() { + return containerId; + } + + public ContainerFinishData getContainerFinishData() { + return containerFinish; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerStartEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerStartEvent.java new file mode 100644 index 00000000000..f6df6691c74 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerStartEvent.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; + +public class WritingContainerStartEvent extends WritingApplicationHistoryEvent { + + private ContainerId containerId; + private ContainerStartData containerStart; + + public WritingContainerStartEvent(ContainerId containerId, + ContainerStartData containerStart) { + super(WritingHistoryEventType.CONTAINER_START); + this.containerId = containerId; + this.containerStart = containerStart; + } + + @Override + public int hashCode() { + return containerId.getApplicationAttemptId().getApplicationId().hashCode(); + } + + public ContainerId getContainerId() { + return containerId; + } + + public ContainerStartData getContainerStartData() { + return containerStart; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java new file mode 100644 index 00000000000..cf27141e1cf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + + +public enum WritingHistoryEventType { + APP_START, + APP_FINISH, + APP_ATTEMPT_START, + APP_ATTEMPT_FINISH, + CONTAINER_START, + CONTAINER_FINISH +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index bbc7b9f609d..55882b30356 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -336,6 +336,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.writeLock = lock.writeLock(); this.stateMachine = stateMachineFactory.make(this); + + rmContext.getRMApplicationHistoryWriter().applicationStarted(this); } @Override @@ -1003,6 +1005,11 @@ public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); + + // TODO: We need to fix for the problem that RMApp enters the final state + // after RMAppAttempt in the killing case + app.rmContext.getRMApplicationHistoryWriter() + .applicationFinished(app); }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 335dbda65e6..3a666dd0710 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -178,4 +179,21 @@ public interface RMAppAttempt extends EventHandler { * @return the start time of the application. */ long getStartTime(); + + /** + * The current state of the {@link RMAppAttempt}. + * + * @return the current state {@link RMAppAttemptState} for this application + * attempt. + */ + RMAppAttemptState getState(); + + /** + * Create the external user-facing state of the attempt of ApplicationMaster + * from the current state of the {@link RMAppAttempt}. + * + * @return the external user-facing state of the attempt ApplicationMaster. + */ + YarnApplicationAttemptState createApplicationAttemptState(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index cd5c9d3bf25..ce246db7ba3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -1046,6 +1048,9 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent( appAttemptId, finalAttemptState, keepContainersAcrossAppAttempts)); appAttempt.removeCredentials(appAttempt); + + appAttempt.rmContext.getRMApplicationHistoryWriter() + .applicationAttemptFinished(appAttempt); } } @@ -1143,6 +1148,9 @@ public void transition(RMAppAttemptImpl appAttempt, // write at AM launch time, so we don't save the AM's tracking URL anywhere // as that would mean an extra state-store write. For now, we hope that in // work-preserving restart, AMs are forced to reregister. + + appAttempt.rmContext.getRMApplicationHistoryWriter() + .applicationAttemptStarted(appAttempt); } } @@ -1514,6 +1522,23 @@ public long getStartTime() { } } + @Override + public RMAppAttemptState getState() { + this.readLock.lock(); + + try { + return this.stateMachine.getCurrentState(); + } finally { + this.readLock.unlock(); + } + } + + @Override + public YarnApplicationAttemptState createApplicationAttemptState() { + RMAppAttemptState state = getState(); + return RMServerUtils.createApplicationAttemptState(state); + } + private void launchAttempt(){ // Send event to launch the AM Container eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index b56f4248f35..adffb997e03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -139,6 +140,7 @@ RMContainerEventType.RELEASED, new KillTransition()) private final ApplicationAttemptId appAttemptId; private final NodeId nodeId; private final Container container; + private final RMContext rmContext; private final EventHandler eventHandler; private final ContainerAllocationExpirer containerAllocationExpirer; private final String user; @@ -151,24 +153,26 @@ RMContainerEventType.RELEASED, new KillTransition()) private String logURL; private ContainerStatus finishedStatus; + public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, - EventHandler handler, - ContainerAllocationExpirer containerAllocationExpirer, - String user) { + String user, RMContext rmContext) { this.stateMachine = stateMachineFactory.make(this); this.containerId = container.getId(); this.nodeId = nodeId; this.container = container; this.appAttemptId = appAttemptId; - this.eventHandler = handler; - this.containerAllocationExpirer = containerAllocationExpirer; this.user = user; this.startTime = System.currentTimeMillis(); + this.rmContext = rmContext; + this.eventHandler = rmContext.getDispatcher().getEventHandler(); + this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); + + rmContext.getRMApplicationHistoryWriter().containerStarted(this); } @Override @@ -386,6 +390,9 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { // Inform AppAttempt container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( container.appAttemptId, finishedEvent.getRemoteContainerStatus())); + + container.rmContext.getRMApplicationHistoryWriter() + .containerFinished(container); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 7b15c261761..38753cbbeda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -233,9 +233,7 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority, if (rmContainer == null) { rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), rmContext.getDispatcher().getEventHandler(), - rmContext.getContainerAllocationExpirer(), - appSchedulingInfo.getUser()); + node.getNodeID(), appSchedulingInfo.getUser(), rmContext); Resources.addTo(currentReservation, container.getResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 1c093c302bf..9c34f2f5995 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -121,9 +121,8 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, this - .getApplicationAttemptId(), node.getNodeID(), this.rmContext - .getDispatcher().getEventHandler(), this.rmContext - .getContainerAllocationExpirer(), appSchedulingInfo.getUser()); + .getApplicationAttemptId(), node.getNodeID(), + appSchedulingInfo.getUser(), this.rmContext); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index b32763fe3fc..adabfefaee1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -271,9 +271,8 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, - getApplicationAttemptId(), node.getNodeID(), rmContext - .getDispatcher().getEventHandler(), rmContext - .getContainerAllocationExpirer(), appSchedulingInfo.getUser()); + getApplicationAttemptId(), node.getNodeID(), + appSchedulingInfo.getUser(), rmContext); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 0a3738200e0..94db331faf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -22,9 +22,9 @@ import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; import java.util.HashMap; import java.util.List; @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -104,9 +105,10 @@ public static RMContext mockRMContext(int n, long time) { rmDispatcher); AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor( rmDispatcher); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext context = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, - null, null, null, null, null) { + null, null, null, null, null, writer) { @Override public ConcurrentMap getRMApps() { return map; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 77398a7a832..7e3d5fe073b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -19,21 +19,21 @@ package org.apache.hadoop.yarn.server.resourcemanager; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -51,9 +51,9 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -599,6 +600,8 @@ private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) .thenReturn(queInfo); when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean())) .thenThrow(new IOException("queue does not exist")); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); ConcurrentHashMap apps = getRMApps(rmContext, yarnScheduler); when(rmContext.getRMApps()).thenReturn(apps); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 82046c7a9de..a966efdc18f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -101,7 +101,7 @@ public void setUp() throws Exception { rmContext = new RMContextImpl(rmDispatcher, null, null, null, - mock(DelegationTokenRenewer.class), null, null, null, null); + mock(DelegationTokenRenewer.class), null, null, null, null, null); scheduler = mock(YarnScheduler.class); doAnswer( new Answer() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java new file mode 100644 index 00000000000..819bcba1e64 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java @@ -0,0 +1,509 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestRMApplicationHistoryWriter { + + private static int MAX_RETRIES = 10; + + private RMApplicationHistoryWriter writer; + private ApplicationHistoryStore store; + private List dispatchers = + new ArrayList(); + + @Before + public void setup() { + store = new MemoryApplicationHistoryStore(); + Configuration conf = new Configuration(); + writer = new RMApplicationHistoryWriter() { + + @Override + protected ApplicationHistoryStore createApplicationHistoryStore( + Configuration conf) { + return store; + } + + @Override + protected Dispatcher createDispatcher(Configuration conf) { + MultiThreadedDispatcher dispatcher = new MultiThreadedDispatcher(conf.getInt( + YarnConfiguration.RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE)); + dispatcher.setDrainEventsOnStop(); + return dispatcher; + } + + class MultiThreadedDispatcher extends + RMApplicationHistoryWriter.MultiThreadedDispatcher { + + public MultiThreadedDispatcher(int num) { + super(num); + } + + @Override + protected AsyncDispatcher createDispatcher() { + CounterDispatcher dispatcher = new CounterDispatcher(); + dispatchers.add(dispatcher); + return dispatcher; + } + + } + }; + writer.init(conf); + writer.start(); + } + + @After + public void tearDown() { + writer.stop(); + } + + private static RMApp createRMApp(ApplicationId appId) { + RMApp app = mock(RMApp.class); + when(app.getApplicationId()).thenReturn(appId); + when(app.getName()).thenReturn("test app"); + when(app.getApplicationType()).thenReturn("test app type"); + when(app.getUser()).thenReturn("test user"); + when(app.getQueue()).thenReturn("test queue"); + when(app.getSubmitTime()).thenReturn(0L); + when(app.getStartTime()).thenReturn(1L); + when(app.getFinishTime()).thenReturn(2L); + when(app.getDiagnostics()).thenReturn( + new StringBuilder("test diagnostics info")); + when(app.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + when(app.createApplicationState()) + .thenReturn(YarnApplicationState.FINISHED); + return app; + } + + private static RMAppAttempt createRMAppAttempt( + ApplicationAttemptId appAttemptId) { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId); + when(appAttempt.getHost()).thenReturn("test host"); + when(appAttempt.getRpcPort()).thenReturn(-100); + Container container = mock(Container.class); + when(container.getId()).thenReturn( + ContainerId.newInstance(appAttemptId, 1)); + when(appAttempt.getMasterContainer()).thenReturn(container); + when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info"); + when(appAttempt.getTrackingUrl()).thenReturn("test url"); + when(appAttempt.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + when(appAttempt.createApplicationAttemptState()).thenReturn( + YarnApplicationAttemptState.FINISHED); + return appAttempt; + } + + private static RMContainer createRMContainer( + ContainerId containerId) { + RMContainer container = mock(RMContainer.class); + when(container.getContainerId()).thenReturn(containerId); + when(container.getAllocatedNode()).thenReturn( + NodeId.newInstance("test host", -100)); + when(container.getAllocatedResource()).thenReturn( + Resource.newInstance(-1, -1)); + when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED); + when(container.getStartTime()).thenReturn(0L); + when(container.getFinishTime()).thenReturn(1L); + when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info"); + when(container.getLogURL()).thenReturn("test log url"); + when(container.getContainerExitStatus()).thenReturn(-1); + when(container.getContainerState()).thenReturn(ContainerState.COMPLETE); + return container; + } + + @Test + public void testWriteApplication() throws Exception { + RMApp app = createRMApp(ApplicationId.newInstance(0, 1)); + + writer.applicationStarted(app); + ApplicationHistoryData appHD = null; + for (int i = 0; i < MAX_RETRIES; ++i) { + appHD = store.getApplication(ApplicationId.newInstance(0, 1)); + if (appHD != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertNotNull(appHD); + Assert.assertEquals("test app", appHD.getApplicationName()); + Assert.assertEquals("test app type", appHD.getApplicationType()); + Assert.assertEquals("test user", appHD.getUser()); + Assert.assertEquals("test queue", appHD.getQueue()); + Assert.assertEquals(0L, appHD.getSubmitTime()); + Assert.assertEquals(1L, appHD.getStartTime()); + + writer.applicationFinished(app); + for (int i = 0; i < MAX_RETRIES; ++i) { + appHD = store.getApplication(ApplicationId.newInstance(0, 1)); + if (appHD.getYarnApplicationState() != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertEquals(2L, appHD.getFinishTime()); + Assert.assertEquals("test diagnostics info", appHD.getDiagnosticsInfo()); + Assert.assertEquals(FinalApplicationStatus.UNDEFINED, + appHD.getFinalApplicationStatus()); + Assert.assertEquals(YarnApplicationState.FINISHED, + appHD.getYarnApplicationState()); + } + + @Test + public void testWriteApplicationAttempt() throws Exception { + RMAppAttempt appAttempt = createRMAppAttempt( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1)); + writer.applicationAttemptStarted(appAttempt); + ApplicationAttemptHistoryData appAttemptHD = null; + for (int i = 0; i < MAX_RETRIES; ++i) { + appAttemptHD = + store.getApplicationAttempt(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1)); + if (appAttemptHD != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertNotNull(appAttemptHD); + Assert.assertEquals("test host", appAttemptHD.getHost()); + Assert.assertEquals(-100, appAttemptHD.getRPCPort()); + Assert.assertEquals( + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1), + appAttemptHD.getMasterContainerId()); + + writer.applicationAttemptFinished(appAttempt); + for (int i = 0; i < MAX_RETRIES; ++i) { + appAttemptHD = + store.getApplicationAttempt(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1)); + if (appAttemptHD.getYarnApplicationAttemptState() != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertEquals("test diagnostics info", + appAttemptHD.getDiagnosticsInfo()); + Assert.assertEquals("test url", appAttemptHD.getTrackingURL()); + Assert.assertEquals(FinalApplicationStatus.UNDEFINED, + appAttemptHD.getFinalApplicationStatus()); + Assert.assertEquals(YarnApplicationAttemptState.FINISHED, + appAttemptHD.getYarnApplicationAttemptState()); + } + + @Test + public void testWriteContainer() throws Exception { + RMContainer container = createRMContainer( + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1)); + writer.containerStarted(container); + ContainerHistoryData containerHD = null; + for (int i = 0; i < MAX_RETRIES; ++i) { + containerHD = + store.getContainer(ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1)); + if (containerHD != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertNotNull(containerHD); + Assert.assertEquals(NodeId.newInstance("test host", -100), + containerHD.getAssignedNode()); + Assert.assertEquals(Resource.newInstance(-1, -1), + containerHD.getAllocatedResource()); + Assert.assertEquals(Priority.UNDEFINED, containerHD.getPriority()); + Assert.assertEquals(0L, container.getStartTime()); + + writer.containerFinished(container); + for (int i = 0; i < MAX_RETRIES; ++i) { + containerHD = + store.getContainer(ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1)); + if (containerHD.getContainerState() != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertEquals("test diagnostics info", + containerHD.getDiagnosticsInfo()); + Assert.assertEquals("test log url", containerHD.getLogURL()); + Assert.assertEquals(-1, containerHD.getContainerExitStatus()); + Assert.assertEquals(ContainerState.COMPLETE, + containerHD.getContainerState()); + } + + @Test + public void testParallelWrite() throws Exception { + List appIds = new ArrayList(); + for (int i = 0; i < 10; ++i) { + Random rand = new Random(i); + ApplicationId appId = ApplicationId.newInstance(0, rand.nextInt()); + appIds.add(appId); + RMApp app = createRMApp(appId); + writer.applicationStarted(app); + for (int j = 1; j <= 10; ++j) { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, j); + RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId); + writer.applicationAttemptStarted(appAttempt); + for (int k = 1; k <= 10; ++k) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, k); + RMContainer container = createRMContainer(containerId); + writer.containerStarted(container); + writer.containerFinished(container); + } + writer.applicationAttemptFinished(appAttempt); + } + writer.applicationFinished(app); + } + for (int i = 0; i < MAX_RETRIES; ++i) { + if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) { + break; + } else { + Thread.sleep(500); + } + } + Assert.assertTrue(allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)); + // Validate all events of one application are handled by one dispatcher + for (ApplicationId appId : appIds) { + Assert.assertTrue(handledByOne(appId)); + } + } + + private boolean allEventsHandled(int expected) { + int actual = 0; + for (CounterDispatcher dispatcher : dispatchers) { + for (Integer count : dispatcher.counts.values()) { + actual += count; + } + } + return actual == expected; + } + + @Test + public void testRMWritingMassiveHistory() throws Exception { + // 1. Show RM can run with writing history data + // 2. Test additional workload of processing history events + YarnConfiguration conf = new YarnConfiguration(); + // don't process history events + MockRM rm = new MockRM(conf) { + @Override + protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { + return new RMApplicationHistoryWriter() { + @Override + public void applicationStarted(RMApp app) { + } + @Override + public void applicationFinished(RMApp app) { + } + @Override + public void applicationAttemptStarted(RMAppAttempt appAttempt) { + } + @Override + public void applicationAttemptFinished(RMAppAttempt appAttempt) { + } + @Override + public void containerStarted(RMContainer container) { + } + @Override + public void containerFinished(RMContainer container) { + } + }; + } + }; + long startTime1 = System.currentTimeMillis(); + testRMWritingMassiveHistory(rm); + long finishTime1 = System.currentTimeMillis(); + long elapsedTime1 = finishTime1 - startTime1; + rm = new MockRM(conf); + long startTime2 = System.currentTimeMillis(); + testRMWritingMassiveHistory(rm); + long finishTime2 = System.currentTimeMillis(); + long elapsedTime2 = finishTime2 - startTime2; + // No more than 10% additional workload + // Should be much less, but computation time is fluctuated + Assert.assertTrue(elapsedTime2 - elapsedTime1 < elapsedTime1 / 10); + } + + private void testRMWritingMassiveHistory(MockRM rm) throws Exception { + rm.start(); + MockNM nm = rm.registerNode("127.0.0.1:1234", 1024 * 10100); + + RMApp app = rm.submitApp(1024); + nm.nodeHeartbeat(true); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + int request = 10000; + am.allocate("127.0.0.1" , 1024, request, + new ArrayList()); + nm.nodeHeartbeat(true); + List allocated = am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + int waitCount = 0; + int allocatedSize = allocated.size(); + while (allocatedSize < request && waitCount++ < 200) { + Thread.sleep(100); + allocated = am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + allocatedSize += allocated.size(); + nm.nodeHeartbeat(true); + } + Assert.assertEquals(request, allocatedSize); + + am.unregisterAppAttempt(); + am.waitForState(RMAppAttemptState.FINISHING); + nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + NodeHeartbeatResponse resp = nm.nodeHeartbeat(true); + List cleaned = resp.getContainersToCleanup(); + int cleanedSize = cleaned.size(); + waitCount = 0; + while (cleanedSize < allocatedSize && waitCount++ < 200) { + Thread.sleep(100); + resp = nm.nodeHeartbeat(true); + cleaned = resp.getContainersToCleanup(); + cleanedSize += cleaned.size(); + } + Assert.assertEquals(allocatedSize, cleanedSize); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + + rm.stop(); + } + + private boolean handledByOne(ApplicationId appId) { + int count = 0; + for (CounterDispatcher dispatcher : dispatchers) { + if (dispatcher.counts.containsKey(appId)) { + ++count; + } + } + return count == 1; + } + + private static class CounterDispatcher extends AsyncDispatcher { + + private Map counts = + new HashMap(); + + @SuppressWarnings("rawtypes") + @Override + protected void dispatch(Event event) { + if (event instanceof WritingApplicationHistoryEvent) { + WritingApplicationHistoryEvent ashEvent = + (WritingApplicationHistoryEvent) event; + switch (ashEvent.getType()) { + case APP_START: + incrementCounts(((WritingApplicationStartEvent) event).getApplicationId()); + break; + case APP_FINISH: + incrementCounts(((WritingApplicationFinishEvent) event) + .getApplicationId()); + break; + case APP_ATTEMPT_START: + incrementCounts(((WritingApplicationAttemptStartEvent) event) + .getApplicationAttemptId().getApplicationId()); + break; + case APP_ATTEMPT_FINISH: + incrementCounts(((WritingApplicationAttemptFinishEvent) event) + .getApplicationAttemptId().getApplicationId()); + break; + case CONTAINER_START: + incrementCounts(((WritingContainerStartEvent) event).getContainerId() + .getApplicationAttemptId().getApplicationId()); + break; + case CONTAINER_FINISH: + incrementCounts(((WritingContainerFinishEvent) event).getContainerId() + .getApplicationAttemptId().getApplicationId()); + break; + } + } + super.dispatch(event); + } + + private void incrementCounts(ApplicationId appId) { + Integer val = counts.get(appId); + if (val == null) { + counts.put(appId, 1); + } else { + counts.put(appId, val + 1); + } + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index a884552bf0a..756bf45d77d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -72,7 +72,7 @@ public void setUp() { // Dispatcher that processes events inline Dispatcher dispatcher = new InlineDispatcher(); RMContext context = new RMContextImpl(dispatcher, null, - null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null); dispatcher.register(SchedulerEventType.class, new InlineDispatcher.EmptyEventHandler()); dispatcher.register(RMNodeEventType.class, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index cbb23740de4..455fa785758 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -71,7 +71,7 @@ public void setUp() { new TestRMNodeEventDispatcher()); RMContext context = new RMContextImpl(dispatcher, null, - null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null); dispatcher.register(SchedulerEventType.class, new InlineDispatcher.EmptyEventHandler()); dispatcher.register(RMNodeEventType.class, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index ddb7a90a592..4f9469548ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -71,7 +71,7 @@ public void handle(Event event) { RMContext context = new RMContextImpl(dispatcher, null, null, null, null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), null); + new NMTokenSecretManagerInRM(conf), null, null); dispatcher.register(RMNodeEventType.class, new ResourceManager.NodeEventDispatcher(context)); NodesListManager nodesListManager = new NodesListManager(context); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 1e5733b49eb..d50bc896ef7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -20,6 +20,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; @@ -83,6 +85,7 @@ public class TestRMAppTransitions { private static int appId = 1; private DrainDispatcher rmDispatcher; private RMStateStore store; + private RMApplicationHistoryWriter writer; private YarnScheduler scheduler; // ignore all the RM application attempt events @@ -178,13 +181,15 @@ public void setUp() throws Exception { AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); store = mock(RMStateStore.class); + writer = mock(RMApplicationHistoryWriter.class); this.rmContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, new AMRMTokenSecretManager(conf), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM()); + new ClientToAMTokenSecretManagerInRM(), + writer); ((RMContextImpl)rmContext).setStateStore(store); rmDispatcher.register(RMAppAttemptEventType.class, @@ -335,6 +340,7 @@ private void sendAttemptUpdateSavedEvent(RMApp application) { protected RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); + verify(writer).applicationStarted(any(RMApp.class)); // NEW => NEW_SAVING event RMAppEventType.START RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.START); @@ -456,6 +462,9 @@ public void testUnmanagedApp() throws IOException { Assert.assertTrue("Finished app missing diagnostics", application.getDiagnostics().indexOf(diagMsg) != -1); + // reset the counter of Mockito.verify + reset(writer); + // test app fails after 1 app attempt failure LOG.info("--- START: testUnmanagedAppFailPath ---"); application = testCreateAppRunning(subContext); @@ -497,6 +506,7 @@ public void testAppNewKill() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertKilled(application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -512,6 +522,7 @@ public void testAppNewReject() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); + verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -526,6 +537,7 @@ public void testAppNewSavingKill() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertKilled(application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -541,6 +553,7 @@ public void testAppNewSavingReject() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); + verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -556,6 +569,7 @@ public void testAppSubmittedRejected() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -570,6 +584,7 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { sendAppUpdateSavedEvent(application); assertKilled(application); assertAppFinalStateSaved(application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -603,6 +618,7 @@ public void testAppAcceptedFailed() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertFailed(application, ".*" + message + ".*Failing the application.*"); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -617,6 +633,7 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { sendAppUpdateSavedEvent(application); assertKilled(application); assertAppFinalStateSaved(application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -639,6 +656,7 @@ public void testAppRunningKill() throws IOException { sendAttemptUpdateSavedEvent(application); sendAppUpdateSavedEvent(application); assertKilled(application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -691,6 +709,7 @@ public void testAppRunningFailed() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, ".*Failing the application.*"); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -748,6 +767,7 @@ public void testAppFinishedFinished() throws IOException { StringBuilder diag = application.getDiagnostics(); Assert.assertEquals("application diagnostics is not correct", "", diag.toString()); + verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -775,6 +795,7 @@ public void testAppFailedFailed() throws IOException { assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -820,6 +841,7 @@ public void testAppKilledKilled() throws IOException { assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 4286950c3c6..9c18fa9baac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; @@ -119,6 +120,8 @@ public class TestRMAppAttemptTransitions { private ApplicationMasterLauncher applicationMasterLauncher; private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; + private RMApplicationHistoryWriter writer; + private RMStateStore store; private RMAppImpl application; @@ -213,13 +216,15 @@ public void setUp() throws Exception { mock(ContainerAllocationExpirer.class); amLivelinessMonitor = mock(AMLivelinessMonitor.class); amFinishingMonitor = mock(AMLivelinessMonitor.class); + writer = mock(RMApplicationHistoryWriter.class); rmContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, amRMTokenManager, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - clientToAMTokenManager); + clientToAMTokenManager, + writer); store = mock(RMStateStore.class); ((RMContextImpl) rmContext).setStateStore(store); @@ -377,6 +382,7 @@ private void testAppAttemptKilledState(Container amContainer, assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); verifyAttemptFinalStateSaved(); assertFalse(transferStateFromPreviousAttempt); } @@ -452,6 +458,7 @@ private void testAppAttemptFailedState(Container container, // Check events verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class)); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); verifyAttemptFinalStateSaved(); } @@ -487,6 +494,7 @@ private void testAppAttemptRunningState(Container container, assertEquals(getProxyUrl(applicationAttempt), applicationAttempt.getTrackingUrl()); } + verify(writer).applicationAttemptStarted(any(RMAppAttempt.class)); // TODO - need to add more checks relevant to this state } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 11873f33b61..40504932652 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -19,9 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -36,6 +39,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -48,12 +53,10 @@ @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestRMContainerImpl { - @SuppressWarnings("resource") @Test public void testReleaseWhileRunning() { DrainDispatcher drainDispatcher = new DrainDispatcher(); - EventHandler eventHandler = drainDispatcher.getEventHandler(); EventHandler appAttemptEventHandler = mock(EventHandler.class); EventHandler generic = mock(EventHandler.class); drainDispatcher.register(RMAppAttemptEventType.class, @@ -74,19 +77,24 @@ public void testReleaseWhileRunning() { Container container = BuilderUtils.newContainer(containerId, nodeId, "host:3465", resource, priority, null); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - nodeId, eventHandler, expirer, "user"); + nodeId, "user", rmContext); assertEquals(RMContainerState.NEW, rmContainer.getState()); assertEquals(resource, rmContainer.getAllocatedResource()); assertEquals(nodeId, rmContainer.getAllocatedNode()); assertEquals(priority, rmContainer.getAllocatedPriority()); + verify(writer).containerStarted(any(RMContainer.class)); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); drainDispatcher.await(); assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); - rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); drainDispatcher.await(); @@ -114,6 +122,7 @@ public void testReleaseWhileRunning() { assertEquals(ContainerExitStatus.ABORTED, rmContainer.getContainerExitStatus()); assertEquals(ContainerState.COMPLETE, rmContainer.getContainerState()); + verify(writer).containerFinished(any(RMContainer.class)); ArgumentCaptor captor = ArgumentCaptor .forClass(RMAppAttemptContainerFinishedEvent.class); @@ -130,12 +139,10 @@ public void testReleaseWhileRunning() { assertEquals(RMContainerState.RELEASED, rmContainer.getState()); } - @SuppressWarnings("resource") @Test public void testExpireWhileRunning() { DrainDispatcher drainDispatcher = new DrainDispatcher(); - EventHandler eventHandler = drainDispatcher.getEventHandler(); EventHandler appAttemptEventHandler = mock(EventHandler.class); EventHandler generic = mock(EventHandler.class); drainDispatcher.register(RMAppAttemptEventType.class, @@ -156,13 +163,19 @@ public void testExpireWhileRunning() { Container container = BuilderUtils.newContainer(containerId, nodeId, "host:3465", resource, priority, null); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - nodeId, eventHandler, expirer, "user"); + nodeId, "user", rmContext); assertEquals(RMContainerState.NEW, rmContainer.getState()); assertEquals(resource, rmContainer.getAllocatedResource()); assertEquals(nodeId, rmContainer.getAllocatedNode()); assertEquals(priority, rmContainer.getAllocatedPriority()); + verify(writer).containerStarted(any(RMContainer.class)); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); @@ -191,5 +204,6 @@ public void testExpireWhileRunning() { containerStatus, RMContainerEventType.EXPIRE)); drainDispatcher.await(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + verify(writer, never()).containerFinished(any(RMContainer.class)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 08efe29453e..ca60db3f04c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -348,7 +348,7 @@ public void testRefreshQueues() throws Exception { cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); conf.setCapacity(A, 80f); @@ -447,7 +447,7 @@ public void testParseQueue() throws IOException { cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); } @Test @@ -460,7 +460,7 @@ public void testReconnectedNode() throws Exception { cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(csConf), new NMTokenSecretManagerInRM(csConf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1); RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2); @@ -487,7 +487,7 @@ public void testRefreshQueuesWithNewQueue() throws Exception { cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); // Add a new queue b4 @@ -638,7 +638,7 @@ public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); SchedulerApplication app = TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 5943c4c0497..d509771b4e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -41,8 +41,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -248,14 +248,18 @@ public void testSortedQueues() throws Exception { ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); DrainDispatcher drainDispatcher = new DrainDispatcher(); - EventHandler eventHandler = drainDispatcher.getEventHandler(); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( app_0.getApplicationId(), 1); ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); Container container=TestUtils.getMockContainer(containerId, node_0.getNodeID(), Resources.createResource(1*GB), priority); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - node_0.getNodeID(), eventHandler, expirer, "user"); + node_0.getNodeID(), "user", rmContext); // Assign {1,2,3,4} 1GB containers respectively to queues stubQueueAllocation(a, clusterResource, node_0, 1*GB); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index c86d6b3d232..21c446aa4d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -47,7 +47,7 @@ public void testQueueParsing() throws Exception { capacityScheduler.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); CSQueue a = capacityScheduler.getQueue("a"); Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index b974528a3cc..db28dcaa558 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -84,12 +85,13 @@ public EventHandler getEventHandler() { new ContainerAllocationExpirer(nullDispatcher); Configuration conf = new Configuration(); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(nullDispatcher, cae, null, null, null, new AMRMTokenSecretManager(conf), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM()); + new ClientToAMTokenSecretManagerInRM(), writer); return rmContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index af819d1787e..38a7995d7bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; @@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -140,8 +143,9 @@ public void testFifoSchedulerCapacityWhenNoNMs() { @Test(timeout=5000) public void testAppAttemptMetrics() throws Exception { AsyncDispatcher dispatcher = new InlineDispatcher(); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(dispatcher, null, - null, null, null, null, null, null, null); + null, null, null, null, null, null, null, writer); FifoScheduler schedular = new FifoScheduler(); schedular.reinitialize(new Configuration(), rmContext); @@ -177,8 +181,9 @@ public void testNodeLocalAssignment() throws Exception { NMTokenSecretManagerInRM nmTokenSecretManager = new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.rollMasterKey(); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null); + null, containerTokenSecretManager, nmTokenSecretManager, null, writer); FifoScheduler scheduler = new FifoScheduler(); scheduler.reinitialize(new Configuration(), rmContext); @@ -241,8 +246,9 @@ public void testUpdateResourceOnNode() throws Exception { NMTokenSecretManagerInRM nmTokenSecretManager = new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.rollMasterKey(); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null); + null, containerTokenSecretManager, nmTokenSecretManager, null, writer); FifoScheduler scheduler = new FifoScheduler(){ @SuppressWarnings("unused") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java index 74dc95a6a70..2c2aae6f9fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java @@ -163,7 +163,7 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes, deactivatedNodesMap.put(node.getHostName(), node); } return new RMContextImpl(null, null, null, null, - null, null, null, null, null) { + null, null, null, null, null, null) { @Override public ConcurrentMap getRMApps() { return applicationsMaps; @@ -206,7 +206,7 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException { cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); return cs; }