YARN-910. Augmented auxiliary services to listen for container starts and completions in addition to application events. Contributed by Alejandro Abdelnur.

svn merge --ignore-ancestry -c 1521298 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1521299 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-09-09 21:49:00 +00:00
parent e5a6acb466
commit e5ee67cbea
10 changed files with 300 additions and 5 deletions

View File

@ -13,6 +13,10 @@ Release 2.3.0 - UNRELEASED
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
YARN-910. Augmented auxiliary services to listen for container starts and
completions in addition to application events. (Alejandro Abdelnur via
vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -79,4 +79,27 @@ public abstract void stopApplication(
* applications.
*/
public abstract ByteBuffer getMetaData();
/**
* A new container is started on this NodeManager. This is a signal to
* this {@link AuxiliaryService} about the container initialization.
* This method is called when the NodeManager receives the container launch
* command from the ApplicationMaster and before the container process is
* launched.
*
* @param initContainerContext context for the container's initialization
*/
public void initializeContainer(ContainerInitializationContext
initContainerContext) {
}
/**
* A container is finishing on this NodeManager. This is a signal to this
* {@link AuxiliaryService} about the same.
*
* @param stopContainerContext context for the container termination
*/
public void stopContainer(ContainerTerminationContext stopContainerContext) {
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* Base context class for {@link AuxiliaryService} initializing and stopping a
* container.
*/
@Public
@Evolving
public class ContainerContext {
private final String user;
private final ContainerId containerId;
private final Resource resource;
@Private
@Unstable
public ContainerContext(String user, ContainerId containerId,
Resource resource) {
this.user = user;
this.containerId = containerId;
this.resource = resource;
}
/**
* Get user of the container being initialized or stopped.
*
* @return the user
*/
public String getUser() {
return user;
}
/**
* Get {@link ContainerId} of the container being initialized or stopped.
*
* @return the container ID
*/
public ContainerId getContainerId() {
return containerId;
}
/**
* Get {@link Resource} the resource capability allocated to the container
* being initialized or stopped.
*
* @return the resource capability.
*/
public Resource getResource() {
return resource;
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* Initialization context for {@link AuxiliaryService} when starting a
* container.
*
*/
@Public
@Evolving
public class ContainerInitializationContext extends ContainerContext {
@Private
@Unstable
public ContainerInitializationContext(String user, ContainerId containerId,
Resource resource) {
super(user, containerId, resource);
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* Termination context for {@link AuxiliaryService} when stopping a
* container.
*
*/
@Public
@Evolving
public class ContainerTerminationContext extends ContainerContext {
@Private
@Unstable
public ContainerTerminationContext(String user, ContainerId containerId,
Resource resource) {
super(user, containerId, resource);
}
}

View File

@ -37,6 +37,8 @@
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
public class AuxServices extends AbstractService
implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
@ -178,7 +180,21 @@ public void handle(AuxServicesEvent event) {
.getApplicationID()));
}
break;
default:
case CONTAINER_INIT:
for (AuxiliaryService serv : serviceMap.values()) {
serv.initializeContainer(new ContainerInitializationContext(
event.getUser(), event.getContainer().getContainerId(),
event.getContainer().getResource()));
}
break;
case CONTAINER_STOP:
for (AuxiliaryService serv : serviceMap.values()) {
serv.stopContainer(new ContainerTerminationContext(
event.getUser(), event.getContainer().getContainerId(),
event.getContainer().getResource()));
}
break;
default:
throw new RuntimeException("Unknown type: " + event.getType());
}
}

View File

@ -21,7 +21,10 @@
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.Container;
public class AuxServicesEvent extends AbstractEvent<AuxServicesEventType> {
@ -29,18 +32,30 @@ public class AuxServicesEvent extends AbstractEvent<AuxServicesEventType> {
private final String serviceId;
private final ByteBuffer serviceData;
private final ApplicationId appId;
private final Container container;
public AuxServicesEvent(AuxServicesEventType eventType, ApplicationId appId) {
this(eventType, null, appId, null, null);
}
public AuxServicesEvent(AuxServicesEventType eventType, Container container) {
this(eventType, null, container.getContainerId().getApplicationAttemptId()
.getApplicationId(), null, null, container);
}
public AuxServicesEvent(AuxServicesEventType eventType, String user,
ApplicationId appId, String serviceId, ByteBuffer serviceData) {
this(eventType, user, appId, serviceId, serviceData, null);
}
public AuxServicesEvent(AuxServicesEventType eventType, String user,
ApplicationId appId, String serviceId, ByteBuffer serviceData,
Container container) {
super(eventType);
this.user = user;
this.appId = appId;
this.serviceId = serviceId;
this.serviceData = serviceData;
this.container = container;
}
public String getServiceID() {
@ -59,4 +74,8 @@ public ApplicationId getApplicationID() {
return appId;
}
public Container getContainer() {
return container;
}
}

View File

@ -20,5 +20,7 @@
public enum AuxServicesEventType {
APPLICATION_INIT,
APPLICATION_STOP
APPLICATION_STOP,
CONTAINER_INIT,
CONTAINER_STOP
}

View File

@ -503,6 +503,9 @@ public ContainerState transition(ContainerImpl container,
final ContainerLaunchContext ctxt = container.launchContext;
container.metrics.initingContainer();
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
(AuxServicesEventType.CONTAINER_INIT, container));
// Inform the AuxServices about the opaque serviceData
Map<String,ByteBuffer> csd = ctxt.getServiceData();
if (csd != null) {
@ -820,8 +823,16 @@ public void transition(ContainerImpl container, ContainerEvent event) {
static class ContainerDoneTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@SuppressWarnings("unchecked")
public void transition(ContainerImpl container, ContainerEvent event) {
container.finished();
//if the current state is NEW it means the CONTAINER_INIT was never
// sent for the event, thus no need to send the CONTAINER_STOP
if (container.getCurrentState()
!= org.apache.hadoop.yarn.api.records.ContainerState.NEW) {
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
(AuxServicesEventType.CONTAINER_STOP, container));
}
}
}

View File

@ -22,6 +22,7 @@
import static org.apache.hadoop.service.Service.STATE.STARTED;
import static org.apache.hadoop.service.Service.STATE.STOPPED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -34,11 +35,21 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerImpl;
import org.junit.Test;
public class TestAuxServices {
@ -52,8 +63,10 @@ static class LightService extends AuxiliaryService implements Service
private int remaining_stop;
private ByteBuffer meta = null;
private ArrayList<Integer> stoppedApps;
private ContainerId containerId;
private Resource resource;
LightService(String name, char idef, int expected_appId) {
LightService(String name, char idef, int expected_appId) {
this(name, idef, expected_appId, null);
}
LightService(String name, char idef, int expected_appId, ByteBuffer meta) {
@ -95,7 +108,22 @@ public void stopApplication(ApplicationTerminationContext context) {
public ByteBuffer getMetaData() {
return meta;
}
}
@Override
public void initializeContainer(
ContainerInitializationContext initContainerContext) {
containerId = initContainerContext.getContainerId();
resource = initContainerContext.getResource();
}
@Override
public void stopContainer(
ContainerTerminationContext stopContainerContext) {
containerId = stopContainerContext.getContainerId();
resource = stopContainerContext.getResource();
}
}
static class ServiceA extends LightService {
public ServiceA() {
@ -142,6 +170,35 @@ public void testAuxEventDispatch() {
assertEquals("app not properly stopped", 1, appIds.size());
assertTrue("wrong app stopped", appIds.contains((Integer)66));
}
for (AuxiliaryService serv : servs) {
assertNull(((LightService) serv).containerId);
assertNull(((LightService) serv).resource);
}
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId1, 1);
ContainerTokenIdentifier cti = new ContainerTokenIdentifier(
ContainerId.newInstance(attemptId, 1), "", "",
Resource.newInstance(1, 1), 0,0,0);
Container container = new ContainerImpl(null, null, null, null, null, cti);
ContainerId containerId = container.getContainerId();
Resource resource = container.getResource();
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);
aux.handle(event);
for (AuxiliaryService serv : servs) {
assertEquals(containerId, ((LightService) serv).containerId);
assertEquals(resource, ((LightService) serv).resource);
((LightService) serv).containerId = null;
((LightService) serv).resource = null;
}
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_STOP, container);
aux.handle(event);
for (AuxiliaryService serv : servs) {
assertEquals(containerId, ((LightService) serv).containerId);
assertEquals(resource, ((LightService) serv).resource);
}
}
@Test