From e5ee67cbead164ee0dd674ae7c0acb1377bc4eaa Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Mon, 9 Sep 2013 21:49:00 +0000 Subject: [PATCH] YARN-910. Augmented auxiliary services to listen for container starts and completions in addition to application events. Contributed by Alejandro Abdelnur. svn merge --ignore-ancestry -c 1521298 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1521299 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 + .../yarn/server/api/AuxiliaryService.java | 25 ++++++- .../yarn/server/api/ContainerContext.java | 75 +++++++++++++++++++ .../api/ContainerInitializationContext.java | 44 +++++++++++ .../api/ContainerTerminationContext.java | 44 +++++++++++ .../containermanager/AuxServices.java | 18 ++++- .../containermanager/AuxServicesEvent.java | 19 +++++ .../AuxServicesEventType.java | 4 +- .../container/ContainerImpl.java | 11 +++ .../containermanager/TestAuxServices.java | 61 ++++++++++++++- 10 files changed, 300 insertions(+), 5 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cb0aec30995..2928962a8db 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -13,6 +13,10 @@ Release 2.3.0 - UNRELEASED YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza) + YARN-910. Augmented auxiliary services to listen for container starts and + completions in addition to application events. (Alejandro Abdelnur via + vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java index 275f2a91038..58b06e274a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java @@ -79,4 +79,27 @@ public abstract class AuxiliaryService extends AbstractService { * applications. */ public abstract ByteBuffer getMetaData(); -} \ No newline at end of file + + /** + * A new container is started on this NodeManager. This is a signal to + * this {@link AuxiliaryService} about the container initialization. + * This method is called when the NodeManager receives the container launch + * command from the ApplicationMaster and before the container process is + * launched. + * + * @param initContainerContext context for the container's initialization + */ + public void initializeContainer(ContainerInitializationContext + initContainerContext) { + } + + /** + * A container is finishing on this NodeManager. This is a signal to this + * {@link AuxiliaryService} about the same. + * + * @param stopContainerContext context for the container termination + */ + public void stopContainer(ContainerTerminationContext stopContainerContext) { + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java new file mode 100644 index 00000000000..d13159b308f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java @@ -0,0 +1,75 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * Base context class for {@link AuxiliaryService} initializing and stopping a + * container. + */ +@Public +@Evolving +public class ContainerContext { + private final String user; + private final ContainerId containerId; + private final Resource resource; + + @Private + @Unstable + public ContainerContext(String user, ContainerId containerId, + Resource resource) { + this.user = user; + this.containerId = containerId; + this.resource = resource; + } + + /** + * Get user of the container being initialized or stopped. + * + * @return the user + */ + public String getUser() { + return user; + } + + /** + * Get {@link ContainerId} of the container being initialized or stopped. + * + * @return the container ID + */ + public ContainerId getContainerId() { + return containerId; + } + + /** + * Get {@link Resource} the resource capability allocated to the container + * being initialized or stopped. + * + * @return the resource capability. + */ + public Resource getResource() { + return resource; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java new file mode 100644 index 00000000000..5b5bbda0c07 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java @@ -0,0 +1,44 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * Initialization context for {@link AuxiliaryService} when starting a + * container. + * + */ +@Public +@Evolving +public class ContainerInitializationContext extends ContainerContext { + + @Private + @Unstable + public ContainerInitializationContext(String user, ContainerId containerId, + Resource resource) { + super(user, containerId, resource); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java new file mode 100644 index 00000000000..34ba73e2213 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java @@ -0,0 +1,44 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * Termination context for {@link AuxiliaryService} when stopping a + * container. + * + */ +@Public +@Evolving +public class ContainerTerminationContext extends ContainerContext { + + @Private + @Unstable + public ContainerTerminationContext(String user, ContainerId containerId, + Resource resource) { + super(user, containerId, resource); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 955ccbf19ea..13f43650a47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; public class AuxServices extends AbstractService implements ServiceStateChangeListener, EventHandler { @@ -178,7 +180,21 @@ public class AuxServices extends AbstractService .getApplicationID())); } break; - default: + case CONTAINER_INIT: + for (AuxiliaryService serv : serviceMap.values()) { + serv.initializeContainer(new ContainerInitializationContext( + event.getUser(), event.getContainer().getContainerId(), + event.getContainer().getResource())); + } + break; + case CONTAINER_STOP: + for (AuxiliaryService serv : serviceMap.values()) { + serv.stopContainer(new ContainerTerminationContext( + event.getUser(), event.getContainer().getContainerId(), + event.getContainer().getResource())); + } + break; + default: throw new RuntimeException("Unknown type: " + event.getType()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java index 4b9c93157b2..1e5a9a737f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java @@ -21,7 +21,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; import java.nio.ByteBuffer; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .Container; public class AuxServicesEvent extends AbstractEvent { @@ -29,18 +32,30 @@ public class AuxServicesEvent extends AbstractEvent { private final String serviceId; private final ByteBuffer serviceData; private final ApplicationId appId; + private final Container container; public AuxServicesEvent(AuxServicesEventType eventType, ApplicationId appId) { this(eventType, null, appId, null, null); } + public AuxServicesEvent(AuxServicesEventType eventType, Container container) { + this(eventType, null, container.getContainerId().getApplicationAttemptId() + .getApplicationId(), null, null, container); + } + public AuxServicesEvent(AuxServicesEventType eventType, String user, ApplicationId appId, String serviceId, ByteBuffer serviceData) { + this(eventType, user, appId, serviceId, serviceData, null); + } + public AuxServicesEvent(AuxServicesEventType eventType, String user, + ApplicationId appId, String serviceId, ByteBuffer serviceData, + Container container) { super(eventType); this.user = user; this.appId = appId; this.serviceId = serviceId; this.serviceData = serviceData; + this.container = container; } public String getServiceID() { @@ -59,4 +74,8 @@ public class AuxServicesEvent extends AbstractEvent { return appId; } + public Container getContainer() { + return container; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java index b8276b0ba14..45f3c8f2906 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java @@ -20,5 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; public enum AuxServicesEventType { APPLICATION_INIT, - APPLICATION_STOP + APPLICATION_STOP, + CONTAINER_INIT, + CONTAINER_STOP } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 54a2cbec56d..c2d32b57bee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -503,6 +503,9 @@ public class ContainerImpl implements Container { final ContainerLaunchContext ctxt = container.launchContext; container.metrics.initingContainer(); + container.dispatcher.getEventHandler().handle(new AuxServicesEvent + (AuxServicesEventType.CONTAINER_INIT, container)); + // Inform the AuxServices about the opaque serviceData Map csd = ctxt.getServiceData(); if (csd != null) { @@ -820,8 +823,16 @@ public class ContainerImpl implements Container { static class ContainerDoneTransition implements SingleArcTransition { @Override + @SuppressWarnings("unchecked") public void transition(ContainerImpl container, ContainerEvent event) { container.finished(); + //if the current state is NEW it means the CONTAINER_INIT was never + // sent for the event, thus no need to send the CONTAINER_STOP + if (container.getCurrentState() + != org.apache.hadoop.yarn.api.records.ContainerState.NEW) { + container.dispatcher.getEventHandler().handle(new AuxServicesEvent + (AuxServicesEventType.CONTAINER_STOP, container)); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index fb4b69a21f0..81f758ee063 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.service.Service.STATE.INITED; import static org.apache.hadoop.service.Service.STATE.STARTED; import static org.apache.hadoop.service.Service.STATE.STOPPED; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -34,11 +35,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerImpl; import org.junit.Test; public class TestAuxServices { @@ -52,8 +63,10 @@ public class TestAuxServices { private int remaining_stop; private ByteBuffer meta = null; private ArrayList stoppedApps; + private ContainerId containerId; + private Resource resource; - LightService(String name, char idef, int expected_appId) { + LightService(String name, char idef, int expected_appId) { this(name, idef, expected_appId, null); } LightService(String name, char idef, int expected_appId, ByteBuffer meta) { @@ -95,7 +108,22 @@ public class TestAuxServices { public ByteBuffer getMetaData() { return meta; } - } + + @Override + public void initializeContainer( + ContainerInitializationContext initContainerContext) { + containerId = initContainerContext.getContainerId(); + resource = initContainerContext.getResource(); + } + + @Override + public void stopContainer( + ContainerTerminationContext stopContainerContext) { + containerId = stopContainerContext.getContainerId(); + resource = stopContainerContext.getResource(); + } + + } static class ServiceA extends LightService { public ServiceA() { @@ -142,6 +170,35 @@ public class TestAuxServices { assertEquals("app not properly stopped", 1, appIds.size()); assertTrue("wrong app stopped", appIds.contains((Integer)66)); } + + for (AuxiliaryService serv : servs) { + assertNull(((LightService) serv).containerId); + assertNull(((LightService) serv).resource); + } + + + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId1, 1); + ContainerTokenIdentifier cti = new ContainerTokenIdentifier( + ContainerId.newInstance(attemptId, 1), "", "", + Resource.newInstance(1, 1), 0,0,0); + Container container = new ContainerImpl(null, null, null, null, null, cti); + ContainerId containerId = container.getContainerId(); + Resource resource = container.getResource(); + event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container); + aux.handle(event); + for (AuxiliaryService serv : servs) { + assertEquals(containerId, ((LightService) serv).containerId); + assertEquals(resource, ((LightService) serv).resource); + ((LightService) serv).containerId = null; + ((LightService) serv).resource = null; + } + + event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_STOP, container); + aux.handle(event); + for (AuxiliaryService serv : servs) { + assertEquals(containerId, ((LightService) serv).containerId); + assertEquals(resource, ((LightService) serv).resource); + } } @Test