YARN-910. Augmented auxiliary services to listen for container starts and completions in addition to application events. Contributed by Alejandro Abdelnur.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1521298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1a649aa51a
commit
0f91d8485a
|
@ -28,6 +28,10 @@ Release 2.3.0 - UNRELEASED
|
|||
|
||||
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
|
||||
|
||||
YARN-910. Augmented auxiliary services to listen for container starts and
|
||||
completions in addition to application events. (Alejandro Abdelnur via
|
||||
vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -79,4 +79,27 @@ public abstract class AuxiliaryService extends AbstractService {
|
|||
* applications.
|
||||
*/
|
||||
public abstract ByteBuffer getMetaData();
|
||||
|
||||
/**
|
||||
* A new container is started on this NodeManager. This is a signal to
|
||||
* this {@link AuxiliaryService} about the container initialization.
|
||||
* This method is called when the NodeManager receives the container launch
|
||||
* command from the ApplicationMaster and before the container process is
|
||||
* launched.
|
||||
*
|
||||
* @param initContainerContext context for the container's initialization
|
||||
*/
|
||||
public void initializeContainer(ContainerInitializationContext
|
||||
initContainerContext) {
|
||||
}
|
||||
|
||||
/**
|
||||
* A container is finishing on this NodeManager. This is a signal to this
|
||||
* {@link AuxiliaryService} about the same.
|
||||
*
|
||||
* @param stopContainerContext context for the container termination
|
||||
*/
|
||||
public void stopContainer(ContainerTerminationContext stopContainerContext) {
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
/**
|
||||
* Base context class for {@link AuxiliaryService} initializing and stopping a
|
||||
* container.
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public class ContainerContext {
|
||||
private final String user;
|
||||
private final ContainerId containerId;
|
||||
private final Resource resource;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ContainerContext(String user, ContainerId containerId,
|
||||
Resource resource) {
|
||||
this.user = user;
|
||||
this.containerId = containerId;
|
||||
this.resource = resource;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get user of the container being initialized or stopped.
|
||||
*
|
||||
* @return the user
|
||||
*/
|
||||
public String getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get {@link ContainerId} of the container being initialized or stopped.
|
||||
*
|
||||
* @return the container ID
|
||||
*/
|
||||
public ContainerId getContainerId() {
|
||||
return containerId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get {@link Resource} the resource capability allocated to the container
|
||||
* being initialized or stopped.
|
||||
*
|
||||
* @return the resource capability.
|
||||
*/
|
||||
public Resource getResource() {
|
||||
return resource;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
/**
|
||||
* Initialization context for {@link AuxiliaryService} when starting a
|
||||
* container.
|
||||
*
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public class ContainerInitializationContext extends ContainerContext {
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ContainerInitializationContext(String user, ContainerId containerId,
|
||||
Resource resource) {
|
||||
super(user, containerId, resource);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
/**
|
||||
* Termination context for {@link AuxiliaryService} when stopping a
|
||||
* container.
|
||||
*
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public class ContainerTerminationContext extends ContainerContext {
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ContainerTerminationContext(String user, ContainerId containerId,
|
||||
Resource resource) {
|
||||
super(user, containerId, resource);
|
||||
}
|
||||
|
||||
}
|
|
@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
|||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
||||
|
||||
public class AuxServices extends AbstractService
|
||||
implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
|
||||
|
@ -178,6 +180,20 @@ public class AuxServices extends AbstractService
|
|||
.getApplicationID()));
|
||||
}
|
||||
break;
|
||||
case CONTAINER_INIT:
|
||||
for (AuxiliaryService serv : serviceMap.values()) {
|
||||
serv.initializeContainer(new ContainerInitializationContext(
|
||||
event.getUser(), event.getContainer().getContainerId(),
|
||||
event.getContainer().getResource()));
|
||||
}
|
||||
break;
|
||||
case CONTAINER_STOP:
|
||||
for (AuxiliaryService serv : serviceMap.values()) {
|
||||
serv.stopContainer(new ContainerTerminationContext(
|
||||
event.getUser(), event.getContainer().getContainerId(),
|
||||
event.getContainer().getResource()));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unknown type: " + event.getType());
|
||||
}
|
||||
|
|
|
@ -21,7 +21,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
||||
.Container;
|
||||
|
||||
public class AuxServicesEvent extends AbstractEvent<AuxServicesEventType> {
|
||||
|
||||
|
@ -29,18 +32,30 @@ public class AuxServicesEvent extends AbstractEvent<AuxServicesEventType> {
|
|||
private final String serviceId;
|
||||
private final ByteBuffer serviceData;
|
||||
private final ApplicationId appId;
|
||||
private final Container container;
|
||||
|
||||
public AuxServicesEvent(AuxServicesEventType eventType, ApplicationId appId) {
|
||||
this(eventType, null, appId, null, null);
|
||||
}
|
||||
|
||||
public AuxServicesEvent(AuxServicesEventType eventType, Container container) {
|
||||
this(eventType, null, container.getContainerId().getApplicationAttemptId()
|
||||
.getApplicationId(), null, null, container);
|
||||
}
|
||||
|
||||
public AuxServicesEvent(AuxServicesEventType eventType, String user,
|
||||
ApplicationId appId, String serviceId, ByteBuffer serviceData) {
|
||||
this(eventType, user, appId, serviceId, serviceData, null);
|
||||
}
|
||||
public AuxServicesEvent(AuxServicesEventType eventType, String user,
|
||||
ApplicationId appId, String serviceId, ByteBuffer serviceData,
|
||||
Container container) {
|
||||
super(eventType);
|
||||
this.user = user;
|
||||
this.appId = appId;
|
||||
this.serviceId = serviceId;
|
||||
this.serviceData = serviceData;
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
public String getServiceID() {
|
||||
|
@ -59,4 +74,8 @@ public class AuxServicesEvent extends AbstractEvent<AuxServicesEventType> {
|
|||
return appId;
|
||||
}
|
||||
|
||||
public Container getContainer() {
|
||||
return container;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,5 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
|||
|
||||
public enum AuxServicesEventType {
|
||||
APPLICATION_INIT,
|
||||
APPLICATION_STOP
|
||||
APPLICATION_STOP,
|
||||
CONTAINER_INIT,
|
||||
CONTAINER_STOP
|
||||
}
|
||||
|
|
|
@ -503,6 +503,9 @@ public class ContainerImpl implements Container {
|
|||
final ContainerLaunchContext ctxt = container.launchContext;
|
||||
container.metrics.initingContainer();
|
||||
|
||||
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
|
||||
(AuxServicesEventType.CONTAINER_INIT, container));
|
||||
|
||||
// Inform the AuxServices about the opaque serviceData
|
||||
Map<String,ByteBuffer> csd = ctxt.getServiceData();
|
||||
if (csd != null) {
|
||||
|
@ -820,8 +823,16 @@ public class ContainerImpl implements Container {
|
|||
static class ContainerDoneTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
container.finished();
|
||||
//if the current state is NEW it means the CONTAINER_INIT was never
|
||||
// sent for the event, thus no need to send the CONTAINER_STOP
|
||||
if (container.getCurrentState()
|
||||
!= org.apache.hadoop.yarn.api.records.ContainerState.NEW) {
|
||||
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
|
||||
(AuxServicesEventType.CONTAINER_STOP, container));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.service.Service.STATE.INITED;
|
|||
import static org.apache.hadoop.service.Service.STATE.STARTED;
|
||||
import static org.apache.hadoop.service.Service.STATE.STOPPED;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -34,11 +35,21 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
||||
.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
||||
.ContainerImpl;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestAuxServices {
|
||||
|
@ -52,6 +63,8 @@ public class TestAuxServices {
|
|||
private int remaining_stop;
|
||||
private ByteBuffer meta = null;
|
||||
private ArrayList<Integer> stoppedApps;
|
||||
private ContainerId containerId;
|
||||
private Resource resource;
|
||||
|
||||
LightService(String name, char idef, int expected_appId) {
|
||||
this(name, idef, expected_appId, null);
|
||||
|
@ -95,6 +108,21 @@ public class TestAuxServices {
|
|||
public ByteBuffer getMetaData() {
|
||||
return meta;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initializeContainer(
|
||||
ContainerInitializationContext initContainerContext) {
|
||||
containerId = initContainerContext.getContainerId();
|
||||
resource = initContainerContext.getResource();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopContainer(
|
||||
ContainerTerminationContext stopContainerContext) {
|
||||
containerId = stopContainerContext.getContainerId();
|
||||
resource = stopContainerContext.getResource();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class ServiceA extends LightService {
|
||||
|
@ -142,6 +170,35 @@ public class TestAuxServices {
|
|||
assertEquals("app not properly stopped", 1, appIds.size());
|
||||
assertTrue("wrong app stopped", appIds.contains((Integer)66));
|
||||
}
|
||||
|
||||
for (AuxiliaryService serv : servs) {
|
||||
assertNull(((LightService) serv).containerId);
|
||||
assertNull(((LightService) serv).resource);
|
||||
}
|
||||
|
||||
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId1, 1);
|
||||
ContainerTokenIdentifier cti = new ContainerTokenIdentifier(
|
||||
ContainerId.newInstance(attemptId, 1), "", "",
|
||||
Resource.newInstance(1, 1), 0,0,0);
|
||||
Container container = new ContainerImpl(null, null, null, null, null, cti);
|
||||
ContainerId containerId = container.getContainerId();
|
||||
Resource resource = container.getResource();
|
||||
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);
|
||||
aux.handle(event);
|
||||
for (AuxiliaryService serv : servs) {
|
||||
assertEquals(containerId, ((LightService) serv).containerId);
|
||||
assertEquals(resource, ((LightService) serv).resource);
|
||||
((LightService) serv).containerId = null;
|
||||
((LightService) serv).resource = null;
|
||||
}
|
||||
|
||||
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_STOP, container);
|
||||
aux.handle(event);
|
||||
for (AuxiliaryService serv : servs) {
|
||||
assertEquals(containerId, ((LightService) serv).containerId);
|
||||
assertEquals(resource, ((LightService) serv).resource);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue