YARN-735. Make ApplicationAttemptId, ContainerId and NodeId immutable. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1488439 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-06-01 00:14:31 +00:00
parent b65e7ed920
commit 39f019f413
56 changed files with 219 additions and 399 deletions

View File

@ -46,7 +46,6 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
/**
* Allocates containers locally. Doesn't allocate a real container;
@ -134,16 +133,13 @@ protected synchronized void heartbeat() throws Exception {
public void handle(ContainerAllocatorEvent event) {
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
LOG.info("Processing the event " + event.toString());
ContainerId cID = recordFactory.newRecordInstance(ContainerId.class);
cID.setApplicationAttemptId(applicationAttemptId);
// Assign the same container ID as the AM
cID.setId(this.containerId.getId());
ContainerId cID =
ContainerId.newInstance(applicationAttemptId,
this.containerId.getId());
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(cID);
NodeId nodeId = Records.newRecord(NodeId.class);
nodeId.setHost(this.nmHost);
nodeId.setPort(this.nmPort);
NodeId nodeId = NodeId.newInstance(this.nmHost, this.nmPort);
container.setNodeId(nodeId);
container.setContainerToken(null);
container.setNodeHttpAddress(this.nmHost + ":" + this.nmHttpPort);

View File

@ -94,8 +94,6 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.state.StateMachine;
@ -122,9 +120,6 @@ public class MRApp extends MRAppMaster {
public static int NM_PORT = 1234;
public static int NM_HTTP_PORT = 8042;
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
//if true, tasks complete automatically as soon as they are launched
protected boolean autoComplete = false;
@ -154,9 +149,7 @@ protected void downloadTokensAndSetupUGI(Configuration conf) {
private static ApplicationAttemptId getApplicationAttemptId(
ApplicationId applicationId, int startCount) {
ApplicationAttemptId applicationAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
applicationAttemptId.setApplicationId(applicationId);
applicationAttemptId.setAttemptId(startCount);
ApplicationAttemptId.newInstance(applicationId, startCount);
return applicationAttemptId;
}

View File

@ -140,18 +140,15 @@ public void run() {
if (concurrentRunningTasks < maxConcurrentRunningTasks) {
event = eventQueue.take();
ContainerId cId =
recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(
getContext().getApplicationAttemptId());
cId.setId(containerCount++);
ContainerId.newInstance(getContext()
.getApplicationAttemptId(), containerCount++);
//System.out.println("Allocating " + containerCount);
Container container =
recordFactory.newRecordInstance(Container.class);
container.setId(cId);
NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
nodeId.setHost("dummy");
nodeId.setPort(1234);
NodeId nodeId = NodeId.newInstance("dummy", 1234);
container.setNodeId(nodeId);
container.setContainerToken(null);
container.setNodeHttpAddress("localhost:8042");

View File

@ -299,12 +299,10 @@ public boolean isFinished() {
@Override
public ContainerId getAssignedContainerID() {
ContainerId id = Records.newRecord(ContainerId.class);
ApplicationAttemptId appAttemptId = Records
.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(taid.getTaskId().getJobId().getAppId());
appAttemptId.setAttemptId(0);
id.setApplicationAttemptId(appAttemptId);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(taid.getTaskId().getJobId()
.getAppId(), 0);
ContainerId id = ContainerId.newInstance(appAttemptId, 0);
return id;
}

View File

@ -791,10 +791,7 @@ class MyAppContext implements AppContext {
MyAppContext(int numberMaps, int numberReduces) {
myApplicationID = ApplicationId.newInstance(clock.getTime(), 1);
myAppAttemptID = recordFactory
.newRecordInstance(ApplicationAttemptId.class);
myAppAttemptID.setApplicationId(myApplicationID);
myAppAttemptID.setAttemptId(0);
myAppAttemptID = ApplicationAttemptId.newInstance(myApplicationID, 0);
myJobID = recordFactory.newRecordInstance(JobId.class);
myJobID.setAppId(myApplicationID);

View File

@ -56,7 +56,6 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
@ -81,12 +80,9 @@ public void testDeletionofStaging() throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
attemptId.setApplicationId(appId);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 0);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
@ -109,12 +105,9 @@ public void testNoDeletionofStagingOnReboot() throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
attemptId.setApplicationId(appId);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 0);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
@ -136,12 +129,9 @@ public void testDeletionofStagingOnReboot() throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(1);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
attemptId.setApplicationId(appId);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.REBOOT, 1); //no retry
@ -163,12 +153,9 @@ public void testDeletionofStagingOnKill() throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
attemptId.setApplicationId(appId);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 0);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
@ -190,12 +177,9 @@ public void testDeletionofStagingOnKillLastTry() throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(1);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
attemptId.setApplicationId(appId);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);

View File

@ -72,6 +72,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -557,8 +558,9 @@ private boolean testUberDecision(Configuration conf) {
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
MRAppMetrics mrAppMetrics = MRAppMetrics.create();
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
JobImpl job =
new JobImpl(jobId, ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 0), 0), conf, mock(EventHandler.class),
null, new JobTokenSecretManager(), new Credentials(), null, null,
mrAppMetrics, null, true, null, 0, null, null, null, null);
InitTransition initTransition = getInitTransition(2);
@ -649,8 +651,8 @@ private static StubbedJob createStubbedJob(Configuration conf,
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
StubbedJob job = new StubbedJob(jobId,
Records.newRecord(ApplicationAttemptId.class), conf,
dispatcher.getEventHandler(), true, "somebody", numSplits);
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
conf,dispatcher.getEventHandler(), true, "somebody", numSplits);
dispatcher.register(JobEventType.class, job);
EventHandler mockHandler = mock(EventHandler.class);
dispatcher.register(TaskEventType.class, mockHandler);

View File

@ -55,7 +55,7 @@ static class TestAppContext implements AppContext {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
}

View File

@ -83,7 +83,7 @@ static class TestAppContext implements AppContext {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
}

View File

@ -93,7 +93,7 @@ static class TestAppContext implements AppContext {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
}

View File

@ -95,7 +95,7 @@ static class TestAppContext implements AppContext {
TestAppContext(int appid, int numTasks, int numAttempts, Path confPath) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
Map<JobId, Job> map = Maps.newHashMap();
Job job = MockJobs.newJob(appID, 0, numTasks, numAttempts, confPath);
map.put(job.getID(), job);

View File

@ -96,7 +96,7 @@ static class TestAppContext implements AppContext {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
}

View File

@ -90,7 +90,7 @@ static class TestAppContext implements AppContext {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
}

View File

@ -71,7 +71,7 @@ static class TestAppContext implements AppContext {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts,
boolean hasFailedTasks) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts,
hasFailedTasks);
}

View File

@ -89,7 +89,7 @@ static class TestAppContext implements HistoryContext {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
}

View File

@ -101,7 +101,7 @@ static class TestAppContext implements HistoryContext {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
}

View File

@ -102,7 +102,7 @@ static class TestAppContext implements HistoryContext {
TestAppContext(int appid, int numTasks, int numAttempts, Path confPath) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
Map<JobId, Job> map = Maps.newHashMap();
Job job = MockJobs.newJob(appID, 0, numTasks, numAttempts, confPath);
map.put(job.getID(), job);

View File

@ -105,7 +105,7 @@ static class TestAppContext implements HistoryContext {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts,
boolean hasFailedTasks) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
JobsPair jobs;
try {
jobs = MockHistoryJobs.newHistoryJobs(appID, numJobs, numTasks,

View File

@ -97,7 +97,7 @@ static class TestAppContext implements HistoryContext {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
appAttemptID = ApplicationAttemptId.newInstance(appID, 0);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
}

View File

@ -78,6 +78,9 @@ Release 2.0.5-beta - UNRELEASED
ContainerTokenIdentifier instead of the entire Container.
(Vinod Kumar Vavilapalli via sseth)
YARN-735. Make ApplicationAttemptId, ContaienrId and NodeId immutable.
(Jian He via sseth)
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
@ -48,6 +47,7 @@ public static ApplicationAttemptId newInstance(ApplicationId appId,
Records.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(attemptId);
appAttemptId.build();
return appAttemptId;
}
@ -60,8 +60,7 @@ public static ApplicationAttemptId newInstance(ApplicationId appId,
public abstract ApplicationId getApplicationId();
@Private
@Unstable
public abstract void setApplicationId(ApplicationId appID);
protected abstract void setApplicationId(ApplicationId appID);
/**
* Get the <code>attempt id</code> of the <code>Application</code>.
@ -70,8 +69,7 @@ public static ApplicationAttemptId newInstance(ApplicationId appId,
public abstract int getAttemptId();
@Private
@Unstable
public abstract void setAttemptId(int attemptId);
protected abstract void setAttemptId(int attemptId);
static final ThreadLocal<NumberFormat> attemptIdFormat =
new ThreadLocal<NumberFormat>() {
@ -131,4 +129,6 @@ public String toString() {
sb.append("_").append(attemptIdFormat.get().format(getAttemptId()));
return sb.toString();
}
protected abstract void build();
}

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
@ -40,6 +39,7 @@ public static ContainerId newInstance(ApplicationAttemptId appAttemptId,
ContainerId id = Records.newRecord(ContainerId.class);
id.setId(containerId);
id.setApplicationAttemptId(appAttemptId);
id.build();
return id;
}
@ -54,8 +54,7 @@ public static ContainerId newInstance(ApplicationAttemptId appAttemptId,
public abstract ApplicationAttemptId getApplicationAttemptId();
@Private
@Unstable
public abstract void setApplicationAttemptId(ApplicationAttemptId atId);
protected abstract void setApplicationAttemptId(ApplicationAttemptId atId);
/**
* Get the identifier of the <code>ContainerId</code>.
@ -66,8 +65,7 @@ public static ContainerId newInstance(ApplicationAttemptId appAttemptId,
public abstract int getId();
@Private
@Unstable
public abstract void setId(int id);
protected abstract void setId(int id);
// TODO: fail the app submission if attempts are more than 10 or something
@ -146,4 +144,6 @@ public String toString() {
sb.append(containerIdFormat.get().format(getId()));
return sb.toString();
}
protected abstract void build();
}

View File

@ -21,7 +21,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
@ -40,6 +39,7 @@ public static NodeId newInstance(String host, int port) {
NodeId nodeId = Records.newRecord(NodeId.class);
nodeId.setHost(host);
nodeId.setPort(port);
nodeId.build();
return nodeId;
}
@ -52,8 +52,7 @@ public static NodeId newInstance(String host, int port) {
public abstract String getHost();
@Private
@Unstable
public abstract void setHost(String host);
protected abstract void setHost(String host);
/**
* Get the <em>port</em> for communicating with the node.
@ -64,8 +63,7 @@ public static NodeId newInstance(String host, int port) {
public abstract int getPort();
@Private
@Unstable
public abstract void setPort(int port);
protected abstract void setPort(int port);
@Override
public String toString() {
@ -111,4 +109,5 @@ public int compareTo(NodeId other) {
return hostCompare;
}
protected abstract void build();
}

View File

@ -21,15 +21,13 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
public class ApplicationAttemptIdPBImpl extends ApplicationAttemptId {
ApplicationAttemptIdProto proto = ApplicationAttemptIdProto
.getDefaultInstance();
ApplicationAttemptIdProto.Builder builder = null;
boolean viaProto = false;
import com.google.common.base.Preconditions;
public class ApplicationAttemptIdPBImpl extends ApplicationAttemptId {
ApplicationAttemptIdProto proto = null;
ApplicationAttemptIdProto.Builder builder = null;
private ApplicationId applicationId = null;
public ApplicationAttemptIdPBImpl() {
@ -38,69 +36,36 @@ public ApplicationAttemptIdPBImpl() {
public ApplicationAttemptIdPBImpl(ApplicationAttemptIdProto proto) {
this.proto = proto;
viaProto = true;
this.applicationId = convertFromProtoFormat(proto.getApplicationId());
}
public synchronized ApplicationAttemptIdProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
public ApplicationAttemptIdProto getProto() {
return proto;
}
private synchronized void mergeLocalToBuilder() {
if (this.applicationId != null
&& !((ApplicationIdPBImpl) applicationId).getProto().equals(
builder.getApplicationId())) {
builder.setApplicationId(convertToProtoFormat(this.applicationId));
}
}
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ApplicationAttemptIdProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized int getAttemptId() {
ApplicationAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
return (p.getAttemptId());
public int getAttemptId() {
Preconditions.checkNotNull(proto);
return proto.getAttemptId();
}
@Override
public synchronized void setAttemptId(int attemptId) {
maybeInitBuilder();
builder.setAttemptId((attemptId));
protected void setAttemptId(int attemptId) {
Preconditions.checkNotNull(builder);
builder.setAttemptId(attemptId);
}
@Override
public synchronized ApplicationId getApplicationId() {
ApplicationAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
if (this.applicationId != null) {
return this.applicationId;
}
if (!p.hasApplicationId()) {
return null;
}
this.applicationId = convertFromProtoFormat(p.getApplicationId());
public ApplicationId getApplicationId() {
return this.applicationId;
}
@Override
public synchronized void setApplicationId(ApplicationId appId) {
maybeInitBuilder();
if (appId == null)
builder.clearApplicationId();
public void setApplicationId(ApplicationId appId) {
if (appId != null) {
Preconditions.checkNotNull(builder);
builder.setApplicationId(convertToProtoFormat(appId));
}
this.applicationId = appId;
}
@ -111,4 +76,10 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl)t).getProto();
}
@Override
protected void build() {
proto = builder.build();
builder = null;
}
}

View File

@ -49,6 +49,7 @@ public int getId() {
@Override
protected void setId(int id) {
Preconditions.checkNotNull(builder);
builder.setId(id);
}
@Override
@ -59,11 +60,13 @@ public long getClusterTimestamp() {
@Override
protected void setClusterTimestamp(long clusterTimestamp) {
Preconditions.checkNotNull(builder);
builder.setClusterTimestamp((clusterTimestamp));
}
@Override
protected void build() {
proto = builder.build();
builder = null;
}
}

View File

@ -22,14 +22,13 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProtoOrBuilder;
import com.google.common.base.Preconditions;
public class ContainerIdPBImpl extends ContainerId {
ContainerIdProto proto = ContainerIdProto.getDefaultInstance();
ContainerIdProto proto = null;
ContainerIdProto.Builder builder = null;
boolean viaProto = false;
private ApplicationAttemptId applicationAttemptId = null;
public ContainerIdPBImpl() {
@ -38,71 +37,37 @@ public ContainerIdPBImpl() {
public ContainerIdPBImpl(ContainerIdProto proto) {
this.proto = proto;
viaProto = true;
this.applicationAttemptId = convertFromProtoFormat(proto.getAppAttemptId());
}
public synchronized ContainerIdProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
public ContainerIdProto getProto() {
return proto;
}
private synchronized void mergeLocalToBuilder() {
if (this.applicationAttemptId != null && !
((ApplicationAttemptIdPBImpl)applicationAttemptId).getProto().equals(
builder.getAppAttemptId())) {
builder.setAppAttemptId(convertToProtoFormat(this.applicationAttemptId));
}
}
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ContainerIdProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized int getId() {
ContainerIdProtoOrBuilder p = viaProto ? proto : builder;
return (p.getId());
public int getId() {
Preconditions.checkNotNull(proto);
return proto.getId();
}
@Override
public synchronized void setId(int id) {
maybeInitBuilder();
protected void setId(int id) {
Preconditions.checkNotNull(builder);
builder.setId((id));
}
@Override
public synchronized ApplicationAttemptId getApplicationAttemptId() {
ContainerIdProtoOrBuilder p = viaProto ? proto : builder;
if (this.applicationAttemptId != null) {
return this.applicationAttemptId;
}
if (!p.hasAppAttemptId()) {
return null;
}
this.applicationAttemptId = convertFromProtoFormat(p.getAppAttemptId());
public ApplicationAttemptId getApplicationAttemptId() {
return this.applicationAttemptId;
}
@Override
public synchronized void setApplicationAttemptId(ApplicationAttemptId atId) {
maybeInitBuilder();
if (atId == null)
builder.clearAppAttemptId();
protected void setApplicationAttemptId(ApplicationAttemptId atId) {
if (atId != null) {
Preconditions.checkNotNull(builder);
builder.setAppAttemptId(convertToProtoFormat(atId));
}
this.applicationAttemptId = atId;
}
@ -115,4 +80,10 @@ private ApplicationAttemptIdProto convertToProtoFormat(
ApplicationAttemptId t) {
return ((ApplicationAttemptIdPBImpl)t).getProto();
}
@Override
protected void build() {
proto = builder.build();
builder = null;
}
}

View File

@ -21,14 +21,14 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProtoOrBuilder;
import com.google.common.base.Preconditions;
public class NodeIdPBImpl extends NodeId {
NodeIdProto proto = NodeIdProto.getDefaultInstance();
NodeIdProto proto = null;
NodeIdProto.Builder builder = null;
boolean viaProto = false;
public NodeIdPBImpl() {
builder = NodeIdProto.newBuilder();
@ -36,43 +36,39 @@ public NodeIdPBImpl() {
public NodeIdPBImpl(NodeIdProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized NodeIdProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
public NodeIdProto getProto() {
return proto;
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = NodeIdProto.newBuilder(proto);
}
viaProto = false;
@Override
public String getHost() {
Preconditions.checkNotNull(proto);
return proto.getHost();
}
@Override
public synchronized String getHost() {
NodeIdProtoOrBuilder p = viaProto ? proto : builder;
return (p.getHost());
protected void setHost(String host) {
Preconditions.checkNotNull(builder);
builder.setHost(host);
}
@Override
public synchronized void setHost(String host) {
maybeInitBuilder();
builder.setHost((host));
public int getPort() {
Preconditions.checkNotNull(proto);
return proto.getPort();
}
@Override
public synchronized int getPort() {
NodeIdProtoOrBuilder p = viaProto ? proto : builder;
return (p.getPort());
protected void setPort(int port) {
Preconditions.checkNotNull(builder);
builder.setPort(port);
}
@Override
public synchronized void setPort(int port) {
maybeInitBuilder();
builder.setPort((port));
protected void build() {
proto = builder.build();
builder = null;
}
}

View File

@ -185,10 +185,7 @@ public void launchAM(ApplicationAttemptId attemptId) throws IOException {
if(!setClasspath && classpath!=null) {
envAMList.add("CLASSPATH="+classpath);
}
ContainerId containerId = Records.newRecord(ContainerId.class);
containerId.setApplicationAttemptId(attemptId);
containerId.setId(0);
ContainerId containerId = ContainerId.newInstance(attemptId, 0);
String hostname = InetAddress.getLocalHost().getHostName();
envAMList.add(Environment.CONTAINER_ID.name() + "=" + containerId);

View File

@ -147,11 +147,7 @@ public static ApplicationId newApplicationId(long clusterTimeStamp, int id) {
public static ApplicationAttemptId newApplicationAttemptId(
ApplicationId appId, int attemptId) {
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(attemptId);
return appAttemptId;
return ApplicationAttemptId.newInstance(appId, attemptId);
}
public static ApplicationId convert(long clustertimestamp, CharSequence id) {
@ -161,10 +157,7 @@ public static ApplicationId convert(long clustertimestamp, CharSequence id) {
public static ContainerId newContainerId(ApplicationAttemptId appAttemptId,
int containerId) {
ContainerId id = recordFactory.newRecordInstance(ContainerId.class);
id.setId(containerId);
id.setApplicationAttemptId(appAttemptId);
return id;
return ContainerId.newInstance(appAttemptId, containerId);
}
public static ContainerId newContainerId(int appId, int appAttemptId,
@ -189,26 +182,11 @@ public static ContainerToken newContainerToken(ContainerId cId, String host,
public static ContainerId newContainerId(RecordFactory recordFactory,
ApplicationId appId, ApplicationAttemptId appAttemptId,
int containerId) {
ContainerId id = recordFactory.newRecordInstance(ContainerId.class);
id.setId(containerId);
id.setApplicationAttemptId(appAttemptId);
return id;
}
public static ContainerId newContainerId(RecordFactory recordFactory,
ApplicationAttemptId appAttemptId,
int containerId) {
ContainerId id = recordFactory.newRecordInstance(ContainerId.class);
id.setApplicationAttemptId(appAttemptId);
id.setId(containerId);
return id;
return ContainerId.newInstance(appAttemptId, containerId);
}
public static NodeId newNodeId(String host, int port) {
NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
nodeId.setHost(host);
nodeId.setPort(port);
return nodeId;
return NodeId.newInstance(host, port);
}
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,

View File

@ -123,10 +123,8 @@ private static ApplicationAttemptId toApplicationAttemptId(
Iterator<String> it) throws NumberFormatException {
ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
Integer.parseInt(it.next()));
ApplicationAttemptId appAttemptId = Records
.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(Integer.parseInt(it.next()));
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, Integer.parseInt(it.next()));
return appAttemptId;
}
@ -164,9 +162,8 @@ public static ContainerId toContainerId(String containerIdStr) {
}
try {
ApplicationAttemptId appAttemptID = toApplicationAttemptId(it);
ContainerId containerId = Records.newRecord(ContainerId.class);
containerId.setApplicationAttemptId(appAttemptID);
containerId.setId(Integer.parseInt(it.next()));
ContainerId containerId =
ContainerId.newInstance(appAttemptID, Integer.parseInt(it.next()));
return containerId;
} catch (NumberFormatException n) {
throw new IllegalArgumentException("Invalid ContainerId: "

View File

@ -20,10 +20,8 @@
import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.collect.Iterators;
@ -65,13 +63,6 @@ public static ApplicationId newAppID(int i) {
return ApplicationId.newInstance(TS, i);
}
public static ApplicationAttemptId newAppAttemptID(ApplicationId appId, int i) {
ApplicationAttemptId id = Records.newRecord(ApplicationAttemptId.class);
id.setApplicationId(appId);
id.setAttemptId(i);
return id;
}
public static YarnApplicationState newAppState() {
synchronized(STATES) {
return STATES.next();

View File

@ -55,7 +55,6 @@
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.annotations.VisibleForTesting;
@ -288,7 +287,7 @@ protected void cleanupContainers(NodeManagerEventType eventType) {
public static class NMContext implements Context {
private final NodeId nodeId = Records.newRecord(NodeId.class);
private NodeId nodeId = null;
private final ConcurrentMap<ApplicationId, Application> applications =
new ConcurrentHashMap<ApplicationId, Application>();
private final ConcurrentMap<ContainerId, Container> containers =
@ -351,6 +350,10 @@ public void setContainerManager(ContainerManager containerManager) {
public void setWebServer(WebServer webServer) {
this.webServer = webServer;
}
public void setNodeId(NodeId nodeId) {
this.nodeId = nodeId;
}
}

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -71,6 +72,7 @@
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
@ -249,8 +251,9 @@ public void start() {
this.setBlockNewContainerRequests(true);
server.start();
InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
this.context.getNodeId().setHost(connectAddress.getHostName());
this.context.getNodeId().setPort(connectAddress.getPort());
NodeId nodeId = NodeId.newInstance(connectAddress.getHostName(),
connectAddress.getPort());
((NodeManager.NMContext)context).setNodeId(nodeId);
LOG.info("ContainerManager started at " + connectAddress);
super.start();
}

View File

@ -126,13 +126,11 @@ public long getRMIdentifier() {
ContainerLaunchContext launchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ContainerId cID = recordFactory.newRecordInstance(ContainerId.class);
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId applicationAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
applicationAttemptId.setApplicationId(applicationId);
applicationAttemptId.setAttemptId(0);
cID.setApplicationAttemptId(applicationAttemptId);
ApplicationAttemptId.newInstance(applicationId, 0);
ContainerId cID = ContainerId.newInstance(applicationAttemptId, 0);
Resource r = BuilderUtils.newResource(1024, 1);
String user = "testing";
String host = "127.0.0.1";

View File

@ -243,13 +243,8 @@ private void createFiles(String dir, String subDir, int numOfFiles) {
private ContainerId createContainerId() {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
Records.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId containerId =
Records.newRecord(ContainerId.class);
containerId.setApplicationAttemptId(appAttemptId);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
return containerId;
}

View File

@ -212,14 +212,9 @@ public ContainerManager run() {
public static ContainerId createContainerId() {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId containerId =
recordFactory.newRecordInstance(ContainerId.class);
containerId.setApplicationAttemptId(appAttemptId);
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
return containerId;
}

View File

@ -152,13 +152,6 @@ public RegisterNodeManagerResponse registerNodeManager(
return response;
}
ApplicationAttemptId appAttemptID = recordFactory
.newRecordInstance(ApplicationAttemptId.class);
ContainerId firstContainerID = recordFactory
.newRecordInstance(ContainerId.class);
ContainerId secondContainerID = recordFactory
.newRecordInstance(ContainerId.class);
private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap(
List<ContainerStatus> containers) {
Map<ApplicationId, List<ContainerStatus>> map =
@ -196,9 +189,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
// Give a container to the NM.
appAttemptID.setApplicationId(appId1);
firstContainerID.setApplicationAttemptId(appAttemptID);
firstContainerID.setId(heartBeatID);
ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId1, 0);
ContainerId firstContainerID =
ContainerId.newInstance(appAttemptID, heartBeatID);
ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
Resource resource = BuilderUtils.newResource(2, 1);
@ -226,9 +220,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
Assert.assertEquals(1, activeContainers.size());
// Give another container to the NM.
appAttemptID.setApplicationId(appId2);
secondContainerID.setApplicationAttemptId(appAttemptID);
secondContainerID.setId(heartBeatID);
ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId2, 0);
ContainerId secondContainerID =
ContainerId.newInstance(appAttemptID, heartBeatID);
ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
long currentTime = System.currentTimeMillis();

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;

View File

@ -76,12 +76,8 @@ public TestContainerManager() throws UnsupportedFileSystemException {
private ContainerId createContainerId() {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId containerId =
recordFactory.newRecordInstance(ContainerId.class);
containerId.setApplicationAttemptId(appAttemptId);
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
return containerId;
}

View File

@ -163,14 +163,10 @@ public void testContainerEnvVariables() throws Exception {
// ////// Construct the Container-id
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId cId =
recordFactory.newRecordInstance(ContainerId.class);
int port = 12345;
cId.setApplicationAttemptId(appAttemptId);
ApplicationAttemptId.newInstance(appId, 1);
int port = 12345;
ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
Map<String, String> userSetEnv = new HashMap<String, String>();
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST");
@ -320,13 +316,8 @@ public void testDelayedKill() throws Exception {
// ////// Construct the Container-id
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId cId =
recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(appAttemptId);
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
File processStartFile =
new File(tmpDir, "pid.txt").getAbsoluteFile();

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -78,6 +79,7 @@
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@ -113,6 +115,13 @@ public TestLogAggregationService() throws UnsupportedFileSystemException {
this.remoteRootLogDir.mkdir();
}
@Override
public void setup() throws IOException {
super.setup();
NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
((NMContext)context).setNodeId(nodeId);
}
@Override
public void tearDown() throws IOException, InterruptedException {
super.tearDown();

View File

@ -202,13 +202,8 @@ public void testContainerKillOnMemoryOverflow() throws IOException,
recordFactory.newRecordInstance(ContainerLaunchContext.class);
// ////// Construct the Container-id
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setId(0);
cId.setApplicationAttemptId(appAttemptId);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
int port = 12345;
URL resource_alpha =

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@ -86,8 +87,8 @@ public class TestNMWebServices extends JerseyTest {
@Override
protected void configureServlets() {
nmContext = new NodeManager.NMContext(null);
nmContext.getNodeId().setHost("testhost.foo.com");
nmContext.getNodeId().setPort(8042);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -93,8 +94,8 @@ public class TestNMWebServicesApps extends JerseyTest {
@Override
protected void configureServlets() {
nmContext = new NodeManager.NMContext(null);
nmContext.getNodeId().setHost("testhost.foo.com");
nmContext.getNodeId().setPort(9999);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {

View File

@ -71,7 +71,6 @@
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
public class RMAppImpl implements RMApp, Recoverable {
@ -579,11 +578,8 @@ public void recover(RMState state) {
@SuppressWarnings("unchecked")
private void createNewAttempt(boolean startAttempt) {
ApplicationAttemptId appAttemptId = Records
.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(applicationId);
appAttemptId.setAttemptId(attempts.size() + 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf, user);

View File

@ -52,7 +52,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.util.Records;
@Private
public class Application {
@ -99,9 +98,9 @@ public Application(String user, String queue, ResourceManager resourceManager) {
this.resourceManager = resourceManager;
this.applicationId =
this.resourceManager.getClientRMService().getNewApplicationId();
this.applicationAttemptId = Records.newRecord(ApplicationAttemptId.class);
this.applicationAttemptId.setApplicationId(this.applicationId);
this.applicationAttemptId.setAttemptId(this.numAttempts.getAndIncrement());
this.applicationAttemptId =
ApplicationAttemptId.newInstance(this.applicationId,
this.numAttempts.getAndIncrement());
}
public String getUser() {

View File

@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -71,13 +70,6 @@ public static List<RMNode> deactivatedNodes(int racks, int nodesPerRack,
return list;
}
public static NodeId newNodeID(String host, int port) {
NodeId nid = recordFactory.newRecordInstance(NodeId.class);
nid.setHost(host);
nid.setPort(port);
return nid;
}
public static Resource newResource(int mem) {
Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory(mem);
@ -214,7 +206,8 @@ private static RMNode buildRMNode(int rack, final Resource perNode,
if (hostName == null) {
hostName = "host"+ nid;
}
final NodeId nodeID = newNodeID(hostName, port);
final NodeId nodeID = NodeId.newInstance(hostName, port);
final String httpAddress = httpAddr;
final NodeHealthStatus nodeHealthStatus =
recordFactory.newRecordInstance(NodeHealthStatus.class);

View File

@ -91,9 +91,7 @@ public NodeManager(String hostName, int containerManagerPort, int httpPort,
this.capability = capability;
Resources.addTo(available, capability);
this.nodeId = recordFactory.newRecordInstance(NodeId.class);
this.nodeId.setHost(hostName);
this.nodeId.setPort(containerManagerPort);
this.nodeId = NodeId.newInstance(hostName, containerManagerPort);
RegisterNodeManagerRequest request = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);

View File

@ -37,7 +37,6 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
@ -260,9 +259,7 @@ public void testNodeRegistrationFailure() throws Exception {
ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService();
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
NodeId nodeId = Records.newRecord(NodeId.class);
nodeId.setHost("host2");
nodeId.setPort(1234);
NodeId nodeId = NodeId.newInstance("host2", 1234);
req.setNodeId(nodeId);
req.setHttpPort(1234);
// trying to register a invalid node.

View File

@ -139,10 +139,10 @@ public String getApplicationType() {
}
public static RMApp newApplication(int i) {
final ApplicationAttemptId appAttemptId = newAppAttemptID(newAppID(i), 0);
final ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(newAppID(i), 0);
final Container masterContainer = Records.newRecord(Container.class);
ContainerId containerId = Records.newRecord(ContainerId.class);
containerId.setApplicationAttemptId(appAttemptId);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
masterContainer.setId(containerId);
masterContainer.setNodeHttpAddress("node:port");
final String user = newUserName();

View File

@ -44,7 +44,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Before;
import org.junit.Test;
@ -132,9 +131,7 @@ public void testNMExpiry() throws Exception {
RegisterNodeManagerRequest request1 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId1 = Records.newRecord(NodeId.class);
nodeId1.setPort(0);
nodeId1.setHost(hostname1);
NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
request1.setNodeId(nodeId1);
request1.setHttpPort(0);
request1.setResource(capability);
@ -142,9 +139,7 @@ public void testNMExpiry() throws Exception {
RegisterNodeManagerRequest request2 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId2 = Records.newRecord(NodeId.class);
nodeId2.setPort(0);
nodeId2.setHost(hostname2);
NodeId nodeId2 = NodeId.newInstance(hostname2, 0);
request2.setNodeId(nodeId2);
request2.setHttpPort(0);
request2.setResource(capability);
@ -160,9 +155,7 @@ public void testNMExpiry() throws Exception {
request3 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId3 = Records.newRecord(NodeId.class);
nodeId3.setPort(0);
nodeId3.setHost(hostname3);
NodeId nodeId3 = NodeId.newInstance(hostname3, 0);
request3.setNodeId(nodeId3);
request3.setHttpPort(0);
request3.setResource(capability);

View File

@ -45,7 +45,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -93,9 +92,7 @@ public void testRPCResponseId() throws IOException, YarnRemoteException {
String node = "localhost";
Resource capability = BuilderUtils.newResource(1024, 1);
RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
nodeId = Records.newRecord(NodeId.class);
nodeId.setHost(node);
nodeId.setPort(1234);
nodeId = NodeId.newInstance(node, 1234);
request.setNodeId(nodeId);
request.setHttpPort(0);
request.setResource(capability);

View File

@ -195,7 +195,7 @@ null, new ApplicationTokenSecretManager(conf),
ApplicationId applicationId = MockApps.newAppID(appId++);
ApplicationAttemptId applicationAttemptId =
MockApps.newAppAttemptID(applicationId, 0);
ApplicationAttemptId.newInstance(applicationId, 0);
final String user = MockApps.newUserName();
final String queue = MockApps.newQueue();

View File

@ -23,22 +23,17 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSSchedulerApp {
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
attId.setAttemptId(attemptId);
attId.setApplicationId(appIdImpl);
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
}

View File

@ -142,10 +142,9 @@ private Configuration createConfiguration() {
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
attId.setAttemptId(attemptId);
attId.setApplicationId(appIdImpl);
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
}
@ -1642,9 +1641,8 @@ public void testNotAllowSubmitApplication() throws Exception {
assertEquals("The application doesn't reach SUBMITTED.",
RMAppState.SUBMITTED, application.getState());
ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
attId.setAttemptId(this.ATTEMPT_ID++);
attId.setApplicationId(applicationId);
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
scheduler.addApplication(attId, queue, user);
numTries = 0;

View File

@ -95,11 +95,9 @@ public void tearDown() throws Exception {
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationAttemptId attId = recordFactory
.newRecordInstance(ApplicationAttemptId.class);
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
attId.setAttemptId(attemptId);
attId.setApplicationId(appIdImpl);
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
}