MAPREDUCE-6677. LocalContainerAllocator doesn't specify resource of the containers allocated (haibochen via rkanter)
This commit is contained in:
parent
1268cf5fbe
commit
ecce3b7d53
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
|
@ -177,6 +178,7 @@ public class LocalContainerAllocator extends RMCommunicator
|
||||||
Container container = recordFactory.newRecordInstance(Container.class);
|
Container container = recordFactory.newRecordInstance(Container.class);
|
||||||
container.setId(cID);
|
container.setId(cID);
|
||||||
NodeId nodeId = NodeId.newInstance(this.nmHost, this.nmPort);
|
NodeId nodeId = NodeId.newInstance(this.nmHost, this.nmPort);
|
||||||
|
container.setResource(Resource.newInstance(0, 0));
|
||||||
container.setNodeId(nodeId);
|
container.setNodeId(nodeId);
|
||||||
container.setContainerToken(null);
|
container.setContainerToken(null);
|
||||||
container.setNodeHttpAddress(this.nmHost + ":" + this.nmHttpPort);
|
container.setNodeHttpAddress(this.nmHost + ":" + this.nmHttpPort);
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.mapreduce.v2.app.local;
|
||||||
|
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -29,10 +31,15 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
|
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
@ -46,12 +53,14 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -60,6 +69,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
public class TestLocalContainerAllocator {
|
public class TestLocalContainerAllocator {
|
||||||
|
|
||||||
|
@ -170,6 +180,39 @@ public class TestLocalContainerAllocator {
|
||||||
ugiToken.getService());
|
ugiToken.getService());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllocatedContainerResourceIsNotNull() {
|
||||||
|
ArgumentCaptor<TaskAttemptContainerAssignedEvent> containerAssignedCaptor
|
||||||
|
= ArgumentCaptor.forClass(TaskAttemptContainerAssignedEvent.class);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
EventHandler<Event> eventHandler = mock(EventHandler.class);
|
||||||
|
AppContext context = mock(AppContext.class) ;
|
||||||
|
when(context.getEventHandler()).thenReturn(eventHandler);
|
||||||
|
ContainerId containerId = ContainerId.fromString(
|
||||||
|
"container_1427562107907_0002_01_000001");
|
||||||
|
LocalContainerAllocator containerAllocator = new LocalContainerAllocator(
|
||||||
|
mock(ClientService.class), context, "localhost", -1, -1, containerId);
|
||||||
|
|
||||||
|
ContainerAllocatorEvent containerAllocatorEvent =
|
||||||
|
createContainerRequestEvent();
|
||||||
|
containerAllocator.handle(containerAllocatorEvent);
|
||||||
|
|
||||||
|
verify(eventHandler, times(1)).handle(containerAssignedCaptor.capture());
|
||||||
|
Container container = containerAssignedCaptor.getValue().getContainer();
|
||||||
|
Resource containerResource = container.getResource();
|
||||||
|
Assert.assertNotNull(containerResource);
|
||||||
|
Assert.assertEquals(containerResource.getMemory(), 0);
|
||||||
|
Assert.assertEquals(containerResource.getVirtualCores(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ContainerAllocatorEvent createContainerRequestEvent() {
|
||||||
|
TaskAttemptId taskAttemptId = mock(TaskAttemptId.class);
|
||||||
|
TaskId taskId = mock(TaskId.class);
|
||||||
|
when(taskAttemptId.getTaskId()).thenReturn(taskId);
|
||||||
|
return new ContainerAllocatorEvent(taskAttemptId,
|
||||||
|
ContainerAllocator.EventType.CONTAINER_REQ);
|
||||||
|
}
|
||||||
|
|
||||||
private static class StubbedLocalContainerAllocator
|
private static class StubbedLocalContainerAllocator
|
||||||
extends LocalContainerAllocator {
|
extends LocalContainerAllocator {
|
||||||
private ApplicationMasterProtocol scheduler;
|
private ApplicationMasterProtocol scheduler;
|
||||||
|
|
|
@ -471,12 +471,6 @@ public class TestMRJobs {
|
||||||
.getValue());
|
.getValue());
|
||||||
Assert.assertEquals(numSleepReducers,
|
Assert.assertEquals(numSleepReducers,
|
||||||
counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
|
counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
|
||||||
Assert
|
|
||||||
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
|
|
||||||
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
|
|
||||||
Assert
|
|
||||||
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
|
|
||||||
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void verifyTaskProgress(Job job) throws InterruptedException,
|
protected void verifyTaskProgress(Job job) throws InterruptedException,
|
||||||
|
@ -548,9 +542,6 @@ public class TestMRJobs {
|
||||||
.getValue());
|
.getValue());
|
||||||
Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
|
Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
|
||||||
.getValue());
|
.getValue());
|
||||||
Assert
|
|
||||||
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
|
|
||||||
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
|
|
Loading…
Reference in New Issue