MAPREDUCE-5270. Migrated MR app from using BuilderUtil factory methods to individual record factory methods. Contributed by Jian He.

svn merge --ignore-ancestry -c 1486271 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1487158 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-28 22:38:14 +00:00
parent ea60ea3b85
commit a2303831b4
25 changed files with 123 additions and 117 deletions

View File

@ -96,6 +96,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5230. Bring back NLineInputFormat.createFileSplit for binary MAPREDUCE-5230. Bring back NLineInputFormat.createFileSplit for binary
compatibility with mapred in 1.x (Mayank Bansal via vinodkv) compatibility with mapred in 1.x (Mayank Bansal via vinodkv)
MAPREDUCE-5270. Migrated MR app from using BuilderUtil factory methods to
individual record factory methods. (Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -132,7 +132,6 @@
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
@ -598,7 +597,7 @@ private static LocalResource createLocalResource(FileSystem fc, Path file,
long resourceSize = fstat.getLen(); long resourceSize = fstat.getLen();
long resourceModificationTime = fstat.getModificationTime(); long resourceModificationTime = fstat.getModificationTime();
return BuilderUtils.newLocalResource(resourceURL, type, visibility, return LocalResource.newInstance(resourceURL, type, visibility,
resourceSize, resourceModificationTime); resourceSize, resourceModificationTime);
} }
@ -762,10 +761,9 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
// Construct the actual Container // Construct the actual Container
// The null fields are per-container and will be constructed for each // The null fields are per-container and will be constructed for each
// container separately. // container separately.
ContainerLaunchContext container = BuilderUtils ContainerLaunchContext container =
.newContainerLaunchContext(localResources, ContainerLaunchContext.newInstance(localResources, environment, null,
environment, null, serviceData, taskCredentialsBuffer, serviceData, taskCredentialsBuffer, applicationACLs);
applicationACLs);
return container; return container;
} }
@ -806,7 +804,7 @@ static ContainerLaunchContext createContainerLaunchContext(
} }
// Construct the actual Container // Construct the actual Container
ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext( ContainerLaunchContext container = ContainerLaunchContext.newInstance(
commonContainerSpec.getLocalResources(), myEnv, commands, commonContainerSpec.getLocalResources(), myEnv, commands,
myServiceData, commonContainerSpec.getTokens().duplicate(), myServiceData, commonContainerSpec.getTokens().duplicate(),
applicationACLs); applicationACLs);
@ -1096,7 +1094,7 @@ public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
// launching the container on an NM, these are already completed tasks, so // launching the container on an NM, these are already completed tasks, so
// setting them to null and RMIdentifier as 0 // setting them to null and RMIdentifier as 0
container = container =
BuilderUtils.newContainer(containerId, containerNodeId, Container.newInstance(containerId, containerNodeId,
nodeHttpAddress, null, null, null, 0); nodeHttpAddress, null, null, null, 0);
computeRackAndLocality(); computeRackAndLocality();
launchTime = taInfo.getStartTime(); launchTime = taInfo.getStartTime();

View File

@ -239,7 +239,11 @@ protected void containerFailedOnHost(String hostName) {
// if ask already sent to RM, we can try and overwrite it if possible. // if ask already sent to RM, we can try and overwrite it if possible.
// send a new ask to RM with numContainers // send a new ask to RM with numContainers
// specified for the blacklisted host to be 0. // specified for the blacklisted host to be 0.
ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req); ResourceRequest zeroedRequest =
ResourceRequest.newInstance(req.getPriority(),
req.getHostName(), req.getCapability(),
req.getNumContainers());
zeroedRequest.setNumContainers(0); zeroedRequest.setNumContainers(0);
// to be sent to RM on next heartbeat // to be sent to RM on next heartbeat
addResourceRequestToAsk(zeroedRequest); addResourceRequestToAsk(zeroedRequest);

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.BuilderUtils;
@XmlRootElement(name = "jobAttempt") @XmlRootElement(name = "jobAttempt")
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
@ -53,7 +52,7 @@ public AMAttemptInfo(AMInfo amInfo, String jobId, String user) {
int nmPort = amInfo.getNodeManagerPort(); int nmPort = amInfo.getNodeManagerPort();
if (nmHost != null) { if (nmHost != null) {
this.nodeHttpAddress = nmHost + ":" + nmHttpPort; this.nodeHttpAddress = nmHost + ":" + nmHttpPort;
NodeId nodeId = BuilderUtils.newNodeId(nmHost, nmPort); NodeId nodeId = NodeId.newInstance(nmHost, nmPort);
this.nodeId = nodeId.toString(); this.nodeId = nodeId.toString();
} }

View File

@ -272,8 +272,8 @@ private class TestParams {
String workDir = setupTestWorkDir(); String workDir = setupTestWorkDir();
ApplicationId appId = BuilderUtils.newApplicationId(200, 1); ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1); ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005"); TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
JobId jobId = MRBuilderUtils.newJobId(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1);
AppContext mockAppContext = mockAppContext(appId); AppContext mockAppContext = mockAppContext(appId);

View File

@ -91,13 +91,13 @@
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils;
/** /**
@ -164,7 +164,7 @@ private static ContainerId getContainerId(ApplicationId applicationId,
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
getApplicationAttemptId(applicationId, startCount); getApplicationAttemptId(applicationId, startCount);
ContainerId containerId = ContainerId containerId =
BuilderUtils.newContainerId(appAttemptId, startCount); ContainerId.newInstance(appAttemptId, startCount);
return containerId; return containerId;
} }
@ -231,9 +231,9 @@ public void init(Configuration conf) {
this.clusterInfo.getMaxContainerCapability()); this.clusterInfo.getMaxContainerCapability());
} else { } else {
getContext().getClusterInfo().setMinContainerCapability( getContext().getClusterInfo().setMinContainerCapability(
BuilderUtils.newResource(1024, 1)); Resource.newInstance(1024, 1));
getContext().getClusterInfo().setMaxContainerCapability( getContext().getClusterInfo().setMaxContainerCapability(
BuilderUtils.newResource(10240, 1)); Resource.newInstance(10240, 1));
} }
} }
@ -517,8 +517,8 @@ public void handle(ContainerAllocatorEvent event) {
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class); ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(getContext().getApplicationAttemptId()); cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
cId.setId(containerCount++); cId.setId(containerCount++);
NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT); NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT);
Container container = BuilderUtils.newContainer(cId, nodeId, Container container = Container.newInstance(cId, nodeId,
NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0); NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0);
JobID id = TypeConverter.fromYarn(applicationId); JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id); JobId jobId = TypeConverter.toYarn(id);

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -205,10 +206,10 @@ protected AMRMProtocol createSchedulerProxy() {
throws IOException { throws IOException {
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class); Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMinimumResourceCapability(BuilderUtils response.setMinimumResourceCapability(Resource.newInstance(
.newResource(1024, 1)); 1024, 1));
response.setMaximumResourceCapability(BuilderUtils response.setMaximumResourceCapability(Resource.newInstance(
.newResource(10240, 1)); 10240, 1));
return response; return response;
} }
@ -236,14 +237,13 @@ public AllocateResponse allocate(AllocateRequest request)
int numContainers = req.getNumContainers(); int numContainers = req.getNumContainers();
for (int i = 0; i < numContainers; i++) { for (int i = 0; i < numContainers; i++) {
ContainerId containerId = ContainerId containerId =
BuilderUtils.newContainerId( ContainerId.newInstance(
request.getApplicationAttemptId(), request.getApplicationAttemptId(),
request.getResponseId() + i); request.getResponseId() + i);
containers.add(BuilderUtils containers.add(Container.newInstance(containerId,
.newContainer(containerId, BuilderUtils.newNodeId("host" NodeId.newInstance("host" + containerId.getId(), 2345),
+ containerId.getId(), 2345), "host" + containerId.getId() + ":5678",
"host" + containerId.getId() + ":5678", req req.getCapability(), req.getPriority(), null, 0));
.getCapability(), req.getPriority(), null, 0));
} }
} }

View File

@ -622,9 +622,9 @@ public Configuration loadConfFile() throws IOException {
} }
private static AMInfo createAMInfo(int attempt) { private static AMInfo createAMInfo(int attempt) {
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
BuilderUtils.newApplicationId(100, 1), attempt); BuilderUtils.newApplicationId(100, 1), attempt);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(), return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(),
containerId, NM_HOST, NM_PORT, NM_HTTP_PORT); containerId, NM_HOST, NM_PORT, NM_HTTP_PORT);
} }

View File

@ -361,9 +361,9 @@ public void testMRAppMasterCredentials() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(12345, 56); ApplicationId appId = BuilderUtils.newApplicationId(12345, 56);
ApplicationAttemptId applicationAttemptId = ApplicationAttemptId applicationAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1); ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId containerId =
BuilderUtils.newContainerId(applicationAttemptId, 546); ContainerId.newInstance(applicationAttemptId, 546);
String userName = UserGroupInformation.getCurrentUser().getShortUserName(); String userName = UserGroupInformation.getCurrentUser().getShortUserName();
// Create staging dir, so MRAppMaster doesn't barf. // Create staging dir, so MRAppMaster doesn't barf.

View File

@ -496,7 +496,7 @@ public void testReportedAppProgress() throws Exception {
rm.sendAMLaunched(appAttemptId); rm.sendAMLaunched(appAttemptId);
rmDispatcher.await(); rmDispatcher.await();
MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId( MRApp mrApp = new MRApp(appAttemptId, ContainerId.newInstance(
appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) { appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) {
@Override @Override
protected Dispatcher createDispatcher() { protected Dispatcher createDispatcher() {
@ -612,7 +612,7 @@ private void finishTask(DrainDispatcher rmDispatcher, MockNM node,
MRApp mrApp, Task task) throws Exception { MRApp mrApp, Task task) throws Exception {
TaskAttempt attempt = task.getAttempts().values().iterator().next(); TaskAttempt attempt = task.getAttempts().values().iterator().next();
List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1); List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1);
contStatus.add(BuilderUtils.newContainerStatus(attempt.getAssignedContainerID(), contStatus.add(ContainerStatus.newInstance(attempt.getAssignedContainerID(),
ContainerState.COMPLETE, "", 0)); ContainerState.COMPLETE, "", 0));
Map<ApplicationId,List<ContainerStatus>> statusUpdate = Map<ApplicationId,List<ContainerStatus>> statusUpdate =
new HashMap<ApplicationId,List<ContainerStatus>>(1); new HashMap<ApplicationId,List<ContainerStatus>>(1);
@ -648,7 +648,7 @@ public void testReportedAppProgressWithOnlyMaps() throws Exception {
rm.sendAMLaunched(appAttemptId); rm.sendAMLaunched(appAttemptId);
rmDispatcher.await(); rmDispatcher.await();
MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId( MRApp mrApp = new MRApp(appAttemptId, ContainerId.newInstance(
appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) { appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) {
@Override @Override
protected Dispatcher createDispatcher() { protected Dispatcher createDispatcher() {
@ -1229,7 +1229,7 @@ public synchronized Allocation allocate(
List<ContainerId> release) { List<ContainerId> release) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) { for (ResourceRequest req : ask) {
ResourceRequest reqCopy = BuilderUtils.newResourceRequest(req ResourceRequest reqCopy = ResourceRequest.newInstance(req
.getPriority(), req.getHostName(), req.getCapability(), req .getPriority(), req.getHostName(), req.getCapability(), req
.getNumContainers()); .getNumContainers());
askCopy.add(reqCopy); askCopy.add(reqCopy);
@ -1255,7 +1255,7 @@ private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
} }
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
taskAttemptId); taskAttemptId);
Resource containerNeed = BuilderUtils.newResource(memory, 1); Resource containerNeed = Resource.newInstance(memory, 1);
if (earlierFailedAttempt) { if (earlierFailedAttempt) {
return ContainerRequestEvent return ContainerRequestEvent
.createContainerRequestEventForFailedContainer(attemptId, .createContainerRequestEventForFailedContainer(attemptId,
@ -1338,8 +1338,8 @@ private static AppContext createAppContext(
when(context.getApplicationAttemptId()).thenReturn(appAttemptId); when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
when(context.getJob(isA(JobId.class))).thenReturn(job); when(context.getJob(isA(JobId.class))).thenReturn(job);
when(context.getClusterInfo()).thenReturn( when(context.getClusterInfo()).thenReturn(
new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils new ClusterInfo(Resource.newInstance(1024, 1), Resource.newInstance(
.newResource(10240, 1))); 10240, 1)));
when(context.getEventHandler()).thenReturn(new EventHandler() { when(context.getEventHandler()).thenReturn(new EventHandler() {
@Override @Override
public void handle(Event event) { public void handle(Event event) {
@ -1412,12 +1412,12 @@ protected void unregister() {
@Override @Override
protected Resource getMinContainerCapability() { protected Resource getMinContainerCapability() {
return BuilderUtils.newResource(1024, 1); return Resource.newInstance(1024, 1);
} }
@Override @Override
protected Resource getMaxContainerCapability() { protected Resource getMaxContainerCapability() {
return BuilderUtils.newResource(10240, 1); return Resource.newInstance(10240, 1);
} }
public void sendRequest(ContainerRequestEvent req) { public void sendRequest(ContainerRequestEvent req) {
@ -1665,11 +1665,14 @@ public void testCompletedContainerEvent() {
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
MRBuilderUtils.newTaskId( MRBuilderUtils.newTaskId(
MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1); MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1); ApplicationId applicationId = BuilderUtils.newApplicationId(1, 1);
ContainerStatus status = BuilderUtils.newContainerStatus( ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(
applicationId, 1);
ContainerId containerId = ContainerId.newInstance(applicationAttemptId, 1);
ContainerStatus status = ContainerStatus.newInstance(
containerId, ContainerState.RUNNING, "", 0); containerId, ContainerState.RUNNING, "", 0);
ContainerStatus abortedStatus = BuilderUtils.newContainerStatus( ContainerStatus abortedStatus = ContainerStatus.newInstance(
containerId, ContainerState.RUNNING, "", containerId, ContainerState.RUNNING, "",
ContainerExitStatus.ABORTED); ContainerExitStatus.ABORTED);

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
@ -219,7 +220,7 @@ private class TestMRApp extends MRAppMaster {
public TestMRApp(ApplicationAttemptId applicationAttemptId, public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator, int maxAppAttempts) { ContainerAllocator allocator, int maxAppAttempts) {
super(applicationAttemptId, BuilderUtils.newContainerId( super(applicationAttemptId, ContainerId.newInstance(
applicationAttemptId, 1), "testhost", 2222, 3333, applicationAttemptId, 1), "testhost", 2222, 3333,
System.currentTimeMillis(), maxAppAttempts); System.currentTimeMillis(), maxAppAttempts);
this.allocator = allocator; this.allocator = allocator;

View File

@ -198,8 +198,8 @@ public void verifySlotMillis(int mapMemMb, int reduceMemMb,
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb); conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb); conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
app.setClusterInfo(new ClusterInfo(BuilderUtils app.setClusterInfo(new ClusterInfo(Resource
.newResource(minContainerSize, 1), BuilderUtils.newResource(10240, 1))); .newInstance(minContainerSize, 1), Resource.newInstance(10240, 1)));
Job job = app.submit(conf); Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING); app.waitForState(job, JobState.RUNNING);
@ -320,7 +320,7 @@ public void handle(JobHistoryEvent event) {
public void testLaunchFailedWhileKilling() throws Exception { public void testLaunchFailedWhileKilling() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2); ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0); ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
@ -345,8 +345,8 @@ public void testLaunchFailedWhileKilling() throws Exception {
mock(Token.class), new Credentials(), mock(Token.class), new Credentials(),
new SystemClock(), null); new SystemClock(), null);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class); Container container = mock(Container.class);
when(container.getId()).thenReturn(contId); when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid); when(container.getNodeId()).thenReturn(nid);
@ -370,7 +370,7 @@ public void testLaunchFailedWhileKilling() throws Exception {
public void testContainerCleanedWhileRunning() throws Exception { public void testContainerCleanedWhileRunning() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2); ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0); ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
@ -402,8 +402,8 @@ public void testContainerCleanedWhileRunning() throws Exception {
mock(Token.class), new Credentials(), mock(Token.class), new Credentials(),
new SystemClock(), appCtx); new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.2", 0); NodeId nid = NodeId.newInstance("127.0.0.2", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class); Container container = mock(Container.class);
when(container.getId()).thenReturn(contId); when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid); when(container.getNodeId()).thenReturn(nid);
@ -428,7 +428,7 @@ public void testContainerCleanedWhileRunning() throws Exception {
public void testContainerCleanedWhileCommitting() throws Exception { public void testContainerCleanedWhileCommitting() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2); ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0); ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
@ -460,8 +460,8 @@ public void testContainerCleanedWhileCommitting() throws Exception {
mock(Token.class), new Credentials(), mock(Token.class), new Credentials(),
new SystemClock(), appCtx); new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class); Container container = mock(Container.class);
when(container.getId()).thenReturn(contId); when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid); when(container.getNodeId()).thenReturn(nid);
@ -489,7 +489,7 @@ public void testContainerCleanedWhileCommitting() throws Exception {
public void testDoubleTooManyFetchFailure() throws Exception { public void testDoubleTooManyFetchFailure() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2); ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0); ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
@ -521,8 +521,8 @@ public void testDoubleTooManyFetchFailure() throws Exception {
mock(Token.class), new Credentials(), mock(Token.class), new Credentials(),
new SystemClock(), appCtx); new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class); Container container = mock(Container.class);
when(container.getId()).thenReturn(contId); when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid); when(container.getNodeId()).thenReturn(nid);
@ -555,7 +555,7 @@ public void testDoubleTooManyFetchFailure() throws Exception {
@Test @Test
public void testAppDiognosticEventOnUnassignedTask() throws Exception { public void testAppDiognosticEventOnUnassignedTask() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2); ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0); appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
@ -587,8 +587,8 @@ public void testAppDiognosticEventOnUnassignedTask() throws Exception {
jobFile, 1, splits, jobConf, taListener, jobFile, 1, splits, jobConf, taListener,
mock(Token.class), new Credentials(), new SystemClock(), appCtx); mock(Token.class), new Credentials(), new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class); Container container = mock(Container.class);
when(container.getId()).thenReturn(contId); when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid); when(container.getNodeId()).thenReturn(nid);
@ -605,7 +605,7 @@ public void testAppDiognosticEventOnUnassignedTask() throws Exception {
@Test @Test
public void testAppDiognosticEventOnNewTask() throws Exception { public void testAppDiognosticEventOnNewTask() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2); ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0); appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
@ -637,8 +637,8 @@ public void testAppDiognosticEventOnNewTask() throws Exception {
jobFile, 1, splits, jobConf, taListener, jobFile, 1, splits, jobConf, taListener,
mock(Token.class), new Credentials(), new SystemClock(), appCtx); mock(Token.class), new Credentials(), new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class); Container container = mock(Container.class);
when(container.getId()).thenReturn(contId); when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid); when(container.getNodeId()).thenReturn(nid);

View File

@ -83,7 +83,7 @@ public class TestContainerLauncher {
public void testPoolSize() throws InterruptedException { public void testPoolSize() throws InterruptedException {
ApplicationId appId = BuilderUtils.newApplicationId(12345, 67); ApplicationId appId = BuilderUtils.newApplicationId(12345, 67);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 3); appId, 3);
JobId jobId = MRBuilderUtils.newJobId(appId, 8); JobId jobId = MRBuilderUtils.newJobId(appId, 8);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP);
@ -104,7 +104,7 @@ public void testPoolSize() throws InterruptedException {
containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, i); ContainerId containerId = ContainerId.newInstance(appAttemptId, i);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null, containerId, "host" + i + ":1234", null,
@ -126,7 +126,7 @@ public void testPoolSize() throws InterruptedException {
Assert.assertEquals(10, containerLauncher.numEventsProcessed.get()); Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
containerLauncher.finishEventHandling = false; containerLauncher.finishEventHandling = false;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, ContainerId containerId = ContainerId.newInstance(appAttemptId,
i + 10); i + 10);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId,
i + 10); i + 10);
@ -143,7 +143,7 @@ public void testPoolSize() throws InterruptedException {
// Core pool size should be 21 but the live pool size should be only 11. // Core pool size should be 21 but the live pool size should be only 11.
containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE; containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
containerLauncher.finishEventHandling = false; containerLauncher.finishEventHandling = false;
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21); ContainerId containerId = ContainerId.newInstance(appAttemptId, 21);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host11:1234", null, containerId, "host11:1234", null,
@ -158,12 +158,12 @@ public void testPoolSize() throws InterruptedException {
@Test @Test
public void testPoolLimits() throws InterruptedException { public void testPoolLimits() throws InterruptedException {
ApplicationId appId = BuilderUtils.newApplicationId(12345, 67); ApplicationId appId = BuilderUtils.newApplicationId(12345, 67);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 3); appId, 3);
JobId jobId = MRBuilderUtils.newJobId(appId, 8); JobId jobId = MRBuilderUtils.newJobId(appId, 8);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10); ContainerId containerId = ContainerId.newInstance(appAttemptId, 10);
AppContext context = mock(AppContext.class); AppContext context = mock(AppContext.class);
CustomContainerLauncher containerLauncher = new CustomContainerLauncher( CustomContainerLauncher containerLauncher = new CustomContainerLauncher(

View File

@ -19,12 +19,11 @@
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.atLeast;
import org.mockito.ArgumentCaptor;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -53,9 +52,12 @@
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@ -66,6 +68,7 @@
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestContainerLauncherImpl { public class TestContainerLauncherImpl {
static final Log LOG = LogFactory.getLog(TestContainerLauncherImpl.class); static final Log LOG = LogFactory.getLog(TestContainerLauncherImpl.class);
@ -122,8 +125,8 @@ public void waitForPoolToIdle() throws InterruptedException {
public static ContainerId makeContainerId(long ts, int appId, int attemptId, public static ContainerId makeContainerId(long ts, int appId, int attemptId,
int id) { int id) {
return BuilderUtils.newContainerId( return ContainerId.newInstance(
BuilderUtils.newApplicationAttemptId( ApplicationAttemptId.newInstance(
BuilderUtils.newApplicationId(ts, appId), attemptId), id); BuilderUtils.newApplicationId(ts, appId), attemptId), id);
} }
@ -406,10 +409,10 @@ public void testContainerCleaned() throws Exception {
private ContainerToken createNewContainerToken(ContainerId contId, private ContainerToken createNewContainerToken(ContainerId contId,
String containerManagerAddr) { String containerManagerAddr) {
return BuilderUtils.newContainerToken(BuilderUtils.newNodeId("127.0.0.1", return BuilderUtils.newContainerToken(NodeId.newInstance("127.0.0.1",
1234), "password".getBytes(), new ContainerTokenIdentifier( 1234), "password".getBytes(), new ContainerTokenIdentifier(
contId, containerManagerAddr, "user", contId, containerManagerAddr, "user",
BuilderUtils.newResource(1024, 1), Resource.newInstance(1024, 1),
System.currentTimeMillis() + 10000L, 123)); System.currentTimeMillis() + 10000L, 123));
} }

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
@ -108,7 +109,7 @@ protected AMRMProtocol createSchedulerProxy() {
private static AppContext createAppContext() { private static AppContext createAppContext() {
ApplicationId appId = BuilderUtils.newApplicationId(1, 1); ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId attemptId =
BuilderUtils.newApplicationAttemptId(appId, 1); ApplicationAttemptId.newInstance(appId, 1);
Job job = mock(Job.class); Job job = mock(Job.class);
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
EventHandler eventHandler = mock(EventHandler.class); EventHandler eventHandler = mock(EventHandler.class);
@ -117,8 +118,8 @@ private static AppContext createAppContext() {
when(ctx.getApplicationAttemptId()).thenReturn(attemptId); when(ctx.getApplicationAttemptId()).thenReturn(attemptId);
when(ctx.getJob(isA(JobId.class))).thenReturn(job); when(ctx.getJob(isA(JobId.class))).thenReturn(job);
when(ctx.getClusterInfo()).thenReturn( when(ctx.getClusterInfo()).thenReturn(
new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils new ClusterInfo(Resource.newInstance(1024, 1), Resource.newInstance(
.newResource(10240, 1))); 10240, 1)));
when(ctx.getEventHandler()).thenReturn(eventHandler); when(ctx.getEventHandler()).thenReturn(eventHandler);
return ctx; return ctx;
} }

View File

@ -46,8 +46,8 @@
import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils; import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
@ -66,9 +66,9 @@
import com.google.inject.servlet.GuiceServletContextListener; import com.google.inject.servlet.GuiceServletContextListener;
import com.google.inject.servlet.ServletModule; import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.JerseyTest; import com.sun.jersey.test.framework.JerseyTest;
import com.sun.jersey.test.framework.WebAppDescriptor; import com.sun.jersey.test.framework.WebAppDescriptor;
@ -967,7 +967,7 @@ public void verifyJobAttemptsGeneric(Job job, String nodeHttpAddress,
WebServicesTestUtils.checkStringMatch("nodeHttpAddress", nmHost + ":" WebServicesTestUtils.checkStringMatch("nodeHttpAddress", nmHost + ":"
+ nmHttpPort, nodeHttpAddress); + nmHttpPort, nodeHttpAddress);
WebServicesTestUtils.checkStringMatch("nodeId", WebServicesTestUtils.checkStringMatch("nodeId",
BuilderUtils.newNodeId(nmHost, nmPort).toString(), nodeId); NodeId.newInstance(nmHost, nmPort).toString(), nodeId);
assertTrue("startime not greater than 0", startTime > 0); assertTrue("startime not greater than 0", startTime > 0);
WebServicesTestUtils.checkStringMatch("containerId", amInfo WebServicesTestUtils.checkStringMatch("containerId", amInfo
.getContainerId().toString(), containerId); .getContainerId().toString(), containerId);

View File

@ -58,7 +58,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ApplicationClassLoader; import org.apache.hadoop.yarn.util.ApplicationClassLoader;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
/** /**
@ -435,15 +434,10 @@ private static void parseDistributedCacheArtifacts(
" This will be an error in Hadoop 2.0"); " This will be an error in Hadoop 2.0");
continue; continue;
} }
localResources.put( localResources.put(linkName, LocalResource.newInstance(ConverterUtils
linkName, .getYarnUrlFromURI(p.toUri()), type, visibilities[i]
BuilderUtils.newLocalResource( ? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE,
p.toUri(), type, sizes[i], timestamps[i]));
visibilities[i]
? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i])
);
} }
} }
} }

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.BuilderUtils;
@XmlRootElement(name = "jobAttempt") @XmlRootElement(name = "jobAttempt")
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
@ -56,7 +55,7 @@ public AMAttemptInfo(AMInfo amInfo, String jobId, String user, String host,
int nmPort = amInfo.getNodeManagerPort(); int nmPort = amInfo.getNodeManagerPort();
if (nmHost != null) { if (nmHost != null) {
this.nodeHttpAddress = nmHost + ":" + nmHttpPort; this.nodeHttpAddress = nmHost + ":" + nmHttpPort;
NodeId nodeId = BuilderUtils.newNodeId(nmHost, nmPort); NodeId nodeId = NodeId.newInstance(nmHost, nmPort);
this.nodeId = nodeId.toString(); this.nodeId = nodeId.toString();
} }

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
@ -243,7 +244,7 @@ public void testLogsView2() throws IOException {
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1) params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
.toString()); .toString());
params.put(NM_NODENAME, params.put(NM_NODENAME,
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString()); NodeId.newInstance(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
params.put(ENTITY_STRING, "container_10_0001_01_000001"); params.put(ENTITY_STRING, "container_10_0001_01_000001");
params.put(APP_OWNER, "owner"); params.put(APP_OWNER, "owner");
@ -271,7 +272,7 @@ public void testLogsViewSingle() throws IOException {
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1) params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
.toString()); .toString());
params.put(NM_NODENAME, params.put(NM_NODENAME,
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString()); NodeId.newInstance(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
params.put(ENTITY_STRING, "container_10_0001_01_000001"); params.put(ENTITY_STRING, "container_10_0001_01_000001");
params.put(APP_OWNER, "owner"); params.put(APP_OWNER, "owner");
@ -302,7 +303,7 @@ public void testLogsViewBadStartEnd() throws IOException {
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1) params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
.toString()); .toString());
params.put(NM_NODENAME, params.put(NM_NODENAME,
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString()); NodeId.newInstance(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
params.put(ENTITY_STRING, "container_10_0001_01_000001"); params.put(ENTITY_STRING, "container_10_0001_01_000001");
params.put(APP_OWNER, "owner"); params.put(APP_OWNER, "owner");

View File

@ -53,8 +53,8 @@
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils; import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
@ -913,7 +913,7 @@ public void verifyHsJobAttemptsGeneric(Job job, String nodeHttpAddress,
WebServicesTestUtils.checkStringMatch("nodeHttpAddress", nmHost + ":" WebServicesTestUtils.checkStringMatch("nodeHttpAddress", nmHost + ":"
+ nmHttpPort, nodeHttpAddress); + nmHttpPort, nodeHttpAddress);
WebServicesTestUtils.checkStringMatch("nodeId", WebServicesTestUtils.checkStringMatch("nodeId",
BuilderUtils.newNodeId(nmHost, nmPort).toString(), nodeId); NodeId.newInstance(nmHost, nmPort).toString(), nodeId);
assertTrue("startime not greater than 0", startTime > 0); assertTrue("startime not greater than 0", startTime > 0);
WebServicesTestUtils.checkStringMatch("containerId", amInfo WebServicesTestUtils.checkStringMatch("containerId", amInfo
.getContainerId().toString(), containerId); .getContainerId().toString(), containerId);

View File

@ -71,6 +71,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ClientToken; import org.apache.hadoop.yarn.api.records.ClientToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -478,7 +479,7 @@ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
taReport.getContainerId().toString(), taReport.getContainerId().toString(),
taReport.getContainerId().getApplicationAttemptId() taReport.getContainerId().getApplicationAttemptId()
.getApplicationId().toString(), .getApplicationId().toString(),
BuilderUtils.newNodeId(taReport.getNodeManagerHost(), NodeId.newInstance(taReport.getNodeManagerHost(),
taReport.getNodeManagerPort()).toString(), report.getUser()); taReport.getNodeManagerPort()).toString(), report.getUser());
} else { } else {
if (report.getAMInfos() == null || report.getAMInfos().size() == 0) { if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
@ -489,7 +490,7 @@ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
return new LogParams( return new LogParams(
amInfo.getContainerId().toString(), amInfo.getContainerId().toString(),
amInfo.getAppAttemptId().getApplicationId().toString(), amInfo.getAppAttemptId().getApplicationId().toString(),
BuilderUtils.newNodeId(amInfo.getNodeManagerHost(), NodeId.newInstance(amInfo.getNodeManagerHost(),
amInfo.getNodeManagerPort()).toString(), report.getUser()); amInfo.getNodeManagerPort()).toString(), report.getUser());
} }
} else { } else {

View File

@ -88,7 +88,7 @@ private ApplicationReport getUnknownApplicationReport() {
// Setting AppState to NEW and finalStatus to UNDEFINED as they are never // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
// used for a non running job // used for a non running job
return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId, return ApplicationReport.newInstance(unknownAppId, unknownAttemptId,
"N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A", "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
"N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
YarnConfiguration.DEFAULT_APPLICATION_TYPE); YarnConfiguration.DEFAULT_APPLICATION_TYPE);

View File

@ -84,7 +84,6 @@
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMTokenSelector; import org.apache.hadoop.yarn.security.client.RMTokenSelector;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.ProtoUtils;
@ -475,9 +474,10 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
// Setup ContainerLaunchContext for AM container // Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer = BuilderUtils ContainerLaunchContext amContainer =
.newContainerLaunchContext(localResources, ContainerLaunchContext.newInstance(localResources, environment,
environment, vargsFinal, null, securityTokens, acls); vargsFinal, null, securityTokens, acls);
// Set up the ApplicationSubmissionContext // Set up the ApplicationSubmissionContext
ApplicationSubmissionContext appContext = ApplicationSubmissionContext appContext =

View File

@ -426,9 +426,9 @@ private GetCountersRequest getCountersRequest() {
private ApplicationReport getFinishedApplicationReport() { private ApplicationReport getFinishedApplicationReport() {
ApplicationId appId = BuilderUtils.newApplicationId(1234, 5); ApplicationId appId = BuilderUtils.newApplicationId(1234, 5);
ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId( ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
appId, 0); appId, 0);
return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue", return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
"appname", "host", 124, null, YarnApplicationState.FINISHED, "appname", "host", 124, null, YarnApplicationState.FINISHED,
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
"N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE); "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE);
@ -436,9 +436,9 @@ private ApplicationReport getFinishedApplicationReport() {
private ApplicationReport getRunningApplicationReport(String host, int port) { private ApplicationReport getRunningApplicationReport(String host, int port) {
ApplicationId appId = BuilderUtils.newApplicationId(1234, 5); ApplicationId appId = BuilderUtils.newApplicationId(1234, 5);
ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId( ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
appId, 0); appId, 0);
return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue", return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
"appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics", "appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics",
"url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
YarnConfiguration.DEFAULT_APPLICATION_TYPE); YarnConfiguration.DEFAULT_APPLICATION_TYPE);

View File

@ -48,10 +48,9 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Before;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestMRJobsWithHistoryService { public class TestMRJobsWithHistoryService {
@ -169,8 +168,8 @@ private void verifyJobReport(JobReport jobReport, JobId jobId) {
List<AMInfo> amInfos = jobReport.getAMInfos(); List<AMInfo> amInfos = jobReport.getAMInfos();
Assert.assertEquals(1, amInfos.size()); Assert.assertEquals(1, amInfos.size());
AMInfo amInfo = amInfos.get(0); AMInfo amInfo = amInfos.get(0);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(jobId.getAppId(), 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(jobId.getAppId(), 1);
ContainerId amContainerId = BuilderUtils.newContainerId(appAttemptId, 1); ContainerId amContainerId = ContainerId.newInstance(appAttemptId, 1);
Assert.assertEquals(appAttemptId, amInfo.getAppAttemptId()); Assert.assertEquals(appAttemptId, amInfo.getAppAttemptId());
Assert.assertEquals(amContainerId, amInfo.getContainerId()); Assert.assertEquals(amContainerId, amInfo.getContainerId());
Assert.assertTrue(jobReport.getSubmitTime() > 0); Assert.assertTrue(jobReport.getSubmitTime() > 0);