YARN-974. Added more information to RMContainer to be collected and recorded in Application-History. Contributed by Zhijie Shen.

svn merge --ignore-ancestry -c 1556733 ../YARN-321


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-01-28 19:53:17 +00:00
parent e8a09094f5
commit f8cd06194d
8 changed files with 147 additions and 8 deletions

View File

@ -487,6 +487,9 @@ Branch YARN-321: Generic ApplicationHistoryService
and Containers from ApplicationHistoryProtocol. (Mayank Bansal and Zhijie Shen and Containers from ApplicationHistoryProtocol. (Mayank Bansal and Zhijie Shen
via vinodkv) via vinodkv)
YARN-974. Added more information to RMContainer to be collected and recorded in
Application-History. (Zhijie Shen via vinodkv)
Release 2.2.0 - 2013-10-13 Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -50,4 +51,22 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
Priority getReservedPriority(); Priority getReservedPriority();
Resource getAllocatedResource();
NodeId getAllocatedNode();
Priority getAllocatedPriority();
long getStartTime();
long getFinishTime();
String getDiagnosticsInfo();
String getLogURL();
int getContainerExitStatus();
ContainerState getContainerState();
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@ -25,9 +27,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -40,6 +45,7 @@ import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public class RMContainerImpl implements RMContainer { public class RMContainerImpl implements RMContainer {
@ -135,15 +141,21 @@ public class RMContainerImpl implements RMContainer {
private final Container container; private final Container container;
private final EventHandler eventHandler; private final EventHandler eventHandler;
private final ContainerAllocationExpirer containerAllocationExpirer; private final ContainerAllocationExpirer containerAllocationExpirer;
private final String user;
private Resource reservedResource; private Resource reservedResource;
private NodeId reservedNode; private NodeId reservedNode;
private Priority reservedPriority; private Priority reservedPriority;
private long startTime;
private long finishTime;
private String logURL;
private ContainerStatus finishedStatus;
public RMContainerImpl(Container container, public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, ApplicationAttemptId appAttemptId, NodeId nodeId,
EventHandler handler, EventHandler handler,
ContainerAllocationExpirer containerAllocationExpirer) { ContainerAllocationExpirer containerAllocationExpirer,
String user) {
this.stateMachine = stateMachineFactory.make(this); this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId(); this.containerId = container.getId();
this.nodeId = nodeId; this.nodeId = nodeId;
@ -151,6 +163,8 @@ public class RMContainerImpl implements RMContainer {
this.appAttemptId = appAttemptId; this.appAttemptId = appAttemptId;
this.eventHandler = handler; this.eventHandler = handler;
this.containerAllocationExpirer = containerAllocationExpirer; this.containerAllocationExpirer = containerAllocationExpirer;
this.user = user;
this.startTime = System.currentTimeMillis();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock(); this.readLock = lock.readLock();
@ -198,6 +212,76 @@ public class RMContainerImpl implements RMContainer {
return reservedPriority; return reservedPriority;
} }
@Override
public Resource getAllocatedResource() {
return container.getResource();
}
@Override
public NodeId getAllocatedNode() {
return container.getNodeId();
}
@Override
public Priority getAllocatedPriority() {
return container.getPriority();
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public long getFinishTime() {
try {
readLock.lock();
return finishTime;
} finally {
readLock.unlock();
}
}
@Override
public String getDiagnosticsInfo() {
try {
readLock.lock();
return finishedStatus.getDiagnostics();
} finally {
readLock.unlock();
}
}
@Override
public String getLogURL() {
try {
readLock.lock();
return logURL;
} finally {
readLock.unlock();
}
}
@Override
public int getContainerExitStatus() {
try {
readLock.lock();
return finishedStatus.getExitStatus();
} finally {
readLock.unlock();
}
}
@Override
public ContainerState getContainerState() {
try {
readLock.lock();
return finishedStatus.getState();
} finally {
readLock.unlock();
}
}
@Override @Override
public String toString() { public String toString() {
return containerId.toString(); return containerId.toString();
@ -276,6 +360,12 @@ public class RMContainerImpl implements RMContainer {
@Override @Override
public void transition(RMContainerImpl container, RMContainerEvent event) { public void transition(RMContainerImpl container, RMContainerEvent event) {
// The logs of running containers should be found on NM webUI
// The logs should be accessible after the container is launched
container.logURL = join(HttpConfig.getSchemePrefix(),
container.container.getNodeHttpAddress(), "/node", "/containerlogs/",
ConverterUtils.toString(container.containerId), "/",
container.user);
// Unregister from containerAllocationExpirer. // Unregister from containerAllocationExpirer.
container.containerAllocationExpirer.unregister(container container.containerAllocationExpirer.unregister(container
.getContainerId()); .getContainerId());
@ -288,6 +378,11 @@ public class RMContainerImpl implements RMContainer {
public void transition(RMContainerImpl container, RMContainerEvent event) { public void transition(RMContainerImpl container, RMContainerEvent event) {
RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
container.finishTime = System.currentTimeMillis();
container.finishedStatus = finishedEvent.getRemoteContainerStatus();
// TODO: when AHS webUI is ready, logURL should be updated to point to
// the web page that will show the aggregated logs
// Inform AppAttempt // Inform AppAttempt
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, finishedEvent.getRemoteContainerStatus())); container.appAttemptId, finishedEvent.getRemoteContainerStatus()));

View File

@ -234,7 +234,8 @@ public abstract class SchedulerApplicationAttempt {
rmContainer = rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(), new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), rmContext.getDispatcher().getEventHandler(), node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
rmContext.getContainerAllocationExpirer()); rmContext.getContainerAllocationExpirer(),
appSchedulingInfo.getUser());
Resources.addTo(currentReservation, container.getResource()); Resources.addTo(currentReservation, container.getResource());

View File

@ -123,7 +123,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
RMContainer rmContainer = new RMContainerImpl(container, this RMContainer rmContainer = new RMContainerImpl(container, this
.getApplicationAttemptId(), node.getNodeID(), this.rmContext .getApplicationAttemptId(), node.getNodeID(), this.rmContext
.getDispatcher().getEventHandler(), this.rmContext .getDispatcher().getEventHandler(), this.rmContext
.getContainerAllocationExpirer()); .getContainerAllocationExpirer(), appSchedulingInfo.getUser());
// Add it to allContainers list. // Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer); newlyAllocatedContainers.add(rmContainer);

View File

@ -273,7 +273,7 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt {
RMContainer rmContainer = new RMContainerImpl(container, RMContainer rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(), rmContext getApplicationAttemptId(), node.getNodeID(), rmContext
.getDispatcher().getEventHandler(), rmContext .getDispatcher().getEventHandler(), rmContext
.getContainerAllocationExpirer()); .getContainerAllocationExpirer(), appSchedulingInfo.getUser());
// Add it to allContainers list. // Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer); newlyAllocatedContainers.add(rmContainer);

View File

@ -26,7 +26,9 @@ import static org.mockito.Mockito.verify;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
@ -46,6 +48,7 @@ import org.mockito.ArgumentCaptor;
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public class TestRMContainerImpl { public class TestRMContainerImpl {
@SuppressWarnings("resource")
@Test @Test
public void testReleaseWhileRunning() { public void testReleaseWhileRunning() {
@ -72,9 +75,12 @@ public class TestRMContainerImpl {
"host:3465", resource, priority, null); "host:3465", resource, priority, null);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, eventHandler, expirer); nodeId, eventHandler, expirer, "user");
assertEquals(RMContainerState.NEW, rmContainer.getState()); assertEquals(RMContainerState.NEW, rmContainer.getState());
assertEquals(resource, rmContainer.getAllocatedResource());
assertEquals(nodeId, rmContainer.getAllocatedNode());
assertEquals(priority, rmContainer.getAllocatedPriority());
rmContainer.handle(new RMContainerEvent(containerId, rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START)); RMContainerEventType.START));
@ -90,6 +96,9 @@ public class TestRMContainerImpl {
RMContainerEventType.LAUNCHED)); RMContainerEventType.LAUNCHED));
drainDispatcher.await(); drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState()); assertEquals(RMContainerState.RUNNING, rmContainer.getState());
assertEquals(
"http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
rmContainer.getLogURL());
// In RUNNING state. Verify RELEASED and associated actions. // In RUNNING state. Verify RELEASED and associated actions.
reset(appAttemptEventHandler); reset(appAttemptEventHandler);
@ -100,6 +109,11 @@ public class TestRMContainerImpl {
containerStatus, RMContainerEventType.RELEASED)); containerStatus, RMContainerEventType.RELEASED));
drainDispatcher.await(); drainDispatcher.await();
assertEquals(RMContainerState.RELEASED, rmContainer.getState()); assertEquals(RMContainerState.RELEASED, rmContainer.getState());
assertEquals(SchedulerUtils.RELEASED_CONTAINER,
rmContainer.getDiagnosticsInfo());
assertEquals(ContainerExitStatus.ABORTED,
rmContainer.getContainerExitStatus());
assertEquals(ContainerState.COMPLETE, rmContainer.getContainerState());
ArgumentCaptor<RMAppAttemptContainerFinishedEvent> captor = ArgumentCaptor ArgumentCaptor<RMAppAttemptContainerFinishedEvent> captor = ArgumentCaptor
.forClass(RMAppAttemptContainerFinishedEvent.class); .forClass(RMAppAttemptContainerFinishedEvent.class);
@ -116,6 +130,7 @@ public class TestRMContainerImpl {
assertEquals(RMContainerState.RELEASED, rmContainer.getState()); assertEquals(RMContainerState.RELEASED, rmContainer.getState());
} }
@SuppressWarnings("resource")
@Test @Test
public void testExpireWhileRunning() { public void testExpireWhileRunning() {
@ -142,9 +157,12 @@ public class TestRMContainerImpl {
"host:3465", resource, priority, null); "host:3465", resource, priority, null);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, eventHandler, expirer); nodeId, eventHandler, expirer, "user");
assertEquals(RMContainerState.NEW, rmContainer.getState()); assertEquals(RMContainerState.NEW, rmContainer.getState());
assertEquals(resource, rmContainer.getAllocatedResource());
assertEquals(nodeId, rmContainer.getAllocatedNode());
assertEquals(priority, rmContainer.getAllocatedPriority());
rmContainer.handle(new RMContainerEvent(containerId, rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START)); RMContainerEventType.START));
@ -160,6 +178,9 @@ public class TestRMContainerImpl {
RMContainerEventType.LAUNCHED)); RMContainerEventType.LAUNCHED));
drainDispatcher.await(); drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState()); assertEquals(RMContainerState.RUNNING, rmContainer.getState());
assertEquals(
"http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
rmContainer.getLogURL());
// In RUNNING state. Verify EXPIRE and associated actions. // In RUNNING state. Verify EXPIRE and associated actions.
reset(appAttemptEventHandler); reset(appAttemptEventHandler);

View File

@ -255,7 +255,7 @@ public class TestChildQueueOrder {
Container container=TestUtils.getMockContainer(containerId, Container container=TestUtils.getMockContainer(containerId,
node_0.getNodeID(), Resources.createResource(1*GB), priority); node_0.getNodeID(), Resources.createResource(1*GB), priority);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
node_0.getNodeID(), eventHandler, expirer); node_0.getNodeID(), eventHandler, expirer, "user");
// Assign {1,2,3,4} 1GB containers respectively to queues // Assign {1,2,3,4} 1GB containers respectively to queues
stubQueueAllocation(a, clusterResource, node_0, 1*GB); stubQueueAllocation(a, clusterResource, node_0, 1*GB);