YARN-2312. Deprecated old ContainerId#getId API and updated MapReduce to use ContainerId#getContainerId instead. Contributed by Tsuyoshi OZAWA

This commit is contained in:
Jian He 2014-10-15 15:22:07 -07:00
parent f19771a24c
commit 0af1a2b5bc
25 changed files with 125 additions and 73 deletions

View File

@ -157,7 +157,7 @@ public class MapReduceChildJVM {
public static List<String> getVMCommand(
InetSocketAddress taskAttemptListenerAddr, Task task,
ID jvmID) {
JVMId jvmID) {
TaskAttemptID attemptID = task.getTaskID();
JobConf conf = task.conf;

View File

@ -23,8 +23,8 @@ package org.apache.hadoop.mapred;
*/
public class WrappedJvmID extends JVMId {
public WrappedJvmID(JobID jobID, boolean mapTask, int nextInt) {
super(jobID, mapTask, nextInt);
public WrappedJvmID(JobID jobID, boolean mapTask, long nextLong) {
super(jobID, mapTask, nextLong);
}
}

View File

@ -82,9 +82,9 @@ class YarnChild {
final InetSocketAddress address =
NetUtils.createSocketAddrForHost(host, port);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
int jvmIdInt = Integer.parseInt(args[3]);
long jvmIdLong = Long.parseLong(args[3]);
JVMId jvmId = new JVMId(firstTaskid.getJobID(),
firstTaskid.getTaskType() == TaskType.MAP, jvmIdInt);
firstTaskid.getTaskType() == TaskType.MAP, jvmIdLong);
// initialize metrics
DefaultMetricsSystem.initialize(

View File

@ -1517,8 +1517,8 @@ public abstract class TaskAttemptImpl implements
taskAttempt.remoteTask = taskAttempt.createRemoteTask();
taskAttempt.jvmID =
new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(),
taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId()
.getId());
taskAttempt.remoteTask.isMapTask(),
taskAttempt.container.getId().getContainerId());
taskAttempt.taskAttemptListener.registerPendingTask(
taskAttempt.remoteTask, taskAttempt.jvmID);

View File

@ -150,7 +150,7 @@ public class LocalContainerAllocator extends RMCommunicator
// Assign the same container ID as the AM
ContainerId cID =
ContainerId.newInstance(getContext().getApplicationAttemptId(),
this.containerId.getId());
this.containerId.getContainerId());
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(cID);
NodeId nodeId = NodeId.newInstance(this.nmHost, this.nmPort);

View File

@ -244,8 +244,9 @@ public class MRAppBenchmark {
getContext().getApplicationAttemptId(),
request.getResponseId() + i);
containers.add(Container.newInstance(containerId,
NodeId.newInstance("host" + containerId.getId(), 2345),
"host" + containerId.getId() + ":5678",
NodeId.newInstance(
"host" + containerId.getContainerId(), 2345),
"host" + containerId.getContainerId() + ":5678",
req.getCapability(), req.getPriority(), null));
}
}

View File

@ -101,7 +101,7 @@ public class TestCheckpointPreemptionPolicy {
for (Map.Entry<ContainerId,TaskAttemptId> ent :
assignedContainers.entrySet()) {
System.out.println("cont:" + ent.getKey().getId() +
System.out.println("cont:" + ent.getKey().getContainerId() +
" type:" + ent.getValue().getTaskId().getTaskType() +
" res:" + contToResourceMap.get(ent.getKey()).getMemory() + "MB" );
}

View File

@ -23,23 +23,25 @@ import java.io.DataOutput;
import java.io.IOException;
import java.text.NumberFormat;
class JVMId extends ID {
class JVMId {
boolean isMap;
JobID jobId;
final JobID jobId;
private long jvmId;
private static final String JVM = "jvm";
private static final char SEPARATOR = '_';
private static NumberFormat idFormat = NumberFormat.getInstance();
static {
idFormat.setGroupingUsed(false);
idFormat.setMinimumIntegerDigits(6);
}
public JVMId(JobID jobId, boolean isMap, int id) {
super(id);
public JVMId(JobID jobId, boolean isMap, long id) {
this.jvmId = id;
this.isMap = isMap;
this.jobId = jobId;
}
public JVMId (String jtIdentifier, int jobId, boolean isMap, int id) {
public JVMId (String jtIdentifier, int jobId, boolean isMap, long id) {
this(new JobID(jtIdentifier, jobId), isMap, id);
}
@ -53,27 +55,50 @@ class JVMId extends ID {
public JobID getJobId() {
return jobId;
}
@Override
public boolean equals(Object o) {
if(o == null)
return false;
if(o.getClass().equals(this.getClass())) {
JVMId that = (JVMId)o;
return this.id==that.id
&& this.isMap == that.isMap
&& this.jobId.equals(that.jobId);
// Generated by IntelliJ IDEA 13.1.
if (this == o) {
return true;
}
else return false;
if (o == null || getClass() != o.getClass()) {
return false;
}
JVMId jvmId1 = (JVMId) o;
if (isMap != jvmId1.isMap) {
return false;
}
if (jvmId != jvmId1.jvmId) {
return false;
}
if (!jobId.equals(jvmId1.jobId)) {
return false;
}
return true;
}
/**Compare TaskInProgressIds by first jobIds, then by tip numbers. Reduces are
* defined as greater then maps.*/
@Override
public int compareTo(org.apache.hadoop.mapreduce.ID o) {
JVMId that = (JVMId)o;
public int hashCode() {
// Generated by IntelliJ IDEA 13.1.
int result = (isMap ? 1 : 0);
result = 31 * result + jobId.hashCode();
result = 31 * result + (int) (jvmId ^ (jvmId >>> 32));
return result;
}
/**
* Compare TaskInProgressIds by first jobIds, then by tip numbers. Reduces are
* defined as greater then maps.
**/
public int compareTo(JVMId that) {
int jobComp = this.jobId.compareTo(that.jobId);
if(jobComp == 0) {
if(this.isMap == that.isMap) {
return this.id - that.id;
return Long.valueOf(this.jvmId).compareTo(that.jvmId);
} else {
return this.isMap ? -1 : 1;
}
@ -87,6 +112,15 @@ class JVMId extends ID {
return appendTo(new StringBuilder(JVM)).toString();
}
/**
* This method does NOT override org.apache.hadoop.mapred.ID to accept 64-bit
* ID to support work-preserving RM restart.
* @return 64-bit JVM id.
*/
public long getId() {
return jvmId;
}
/**
* Add the unique id to the given StringBuilder.
* @param builder the builder to append to
@ -97,24 +131,17 @@ class JVMId extends ID {
append(SEPARATOR).
append(isMap ? 'm' : 'r').
append(SEPARATOR).
append(idFormat.format(id));
append(idFormat.format(jvmId));
}
@Override
public int hashCode() {
return jobId.hashCode() * 11 + id;
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
this.jvmId = in.readLong();
this.jobId.readFields(in);
this.isMap = in.readBoolean();
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeLong(jvmId);
jobId.write(out);
out.writeBoolean(isMap);
}

View File

@ -671,7 +671,8 @@ public class TestMRJobs {
if (!foundAppMaster) {
final ContainerId cid = ConverterUtils.toContainerId(
containerPathComponent.getName());
foundAppMaster = (cid.getId() == 1);
foundAppMaster =
((cid.getContainerId() & ContainerId.CONTAINER_ID_BITMASK)== 1);
}
final FileStatus[] sysSiblings = localFs.globStatus(new Path(

View File

@ -348,6 +348,9 @@ Release 2.6.0 - UNRELEASED
YARN-2583. Modified AggregatedLogDeletionService to be able to delete rolling
aggregated logs. (Xuan Gong via zjshen)
YARN-2312. Deprecated old ContainerId#getId API and updated MapReduce to
use ContainerId#getContainerId instead. (Tsuyoshi OZAWA via jianhe)
OPTIMIZATIONS
BUG FIXES

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.util.Records;
@Public
@Stable
public abstract class ContainerId implements Comparable<ContainerId>{
public static final long CONTAINER_ID_BITMASK = 0xffffffffffL;
private static final Splitter _SPLITTER = Splitter.on('_').trimResults();
private static final String CONTAINER_PREFIX = "container";
private static final String EPOCH_PREFIX = "e";
@ -81,6 +82,7 @@ public abstract class ContainerId implements Comparable<ContainerId>{
* @return lower 32 bits of identifier of the <code>ContainerId</code>
*/
@Public
@Deprecated
@Stable
public abstract int getId();
@ -184,7 +186,8 @@ public abstract class ContainerId implements Comparable<ContainerId>{
sb.append(
appAttemptIdAndEpochFormat.get().format(
getApplicationAttemptId().getAttemptId())).append("_");
sb.append(containerIdFormat.get().format(0xffffffffffL & getContainerId()));
sb.append(containerIdFormat.get()
.format(CONTAINER_ID_BITMASK & getContainerId()));
return sb.toString();
}

View File

@ -219,6 +219,7 @@ public class TestNMClientAsync {
actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
}
@SuppressWarnings("deprecation")
@Override
public void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse) {
@ -242,6 +243,7 @@ public class TestNMClientAsync {
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
@ -259,6 +261,7 @@ public class TestNMClientAsync {
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onContainerStopped(ContainerId containerId) {
if (containerId.getId() >= expectedSuccess) {
@ -273,6 +276,7 @@ public class TestNMClientAsync {
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
// If the unexpected throwable comes from success callback functions, it
@ -296,6 +300,7 @@ public class TestNMClientAsync {
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
if (t instanceof RuntimeException) {
@ -316,6 +321,7 @@ public class TestNMClientAsync {
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onGetContainerStatusError(ContainerId containerId,
Throwable t) {

View File

@ -47,6 +47,7 @@ public class ContainerIdPBImpl extends ContainerId {
return proto;
}
@Deprecated
@Override
public int getId() {
Preconditions.checkNotNull(proto);

View File

@ -68,6 +68,7 @@ public class ApplicationHistoryStoreTestUtils {
FinalApplicationStatus.UNDEFINED, YarnApplicationAttemptState.FINISHED));
}
@SuppressWarnings("deprecation")
protected void writeContainerStartData(ContainerId containerId)
throws IOException {
store.containerStarted(ContainerStartData.newInstance(containerId,

View File

@ -138,6 +138,7 @@ public class TestFileSystemApplicationHistoryStore extends
testReadHistoryData(num, false, false);
}
@SuppressWarnings("deprecation")
private void testReadHistoryData(
int num, boolean missingContainer, boolean missingApplicationAttempt)
throws IOException {

View File

@ -130,6 +130,7 @@ public class TestMemoryApplicationHistoryStore extends
}
}
@SuppressWarnings("deprecation")
@Test
public void testReadWriteContainerHistory() throws Exception {
// Out of order

View File

@ -265,6 +265,7 @@ public class TestAHSWebServices extends JerseyTest {
assertEquals("incorrect number of elements", 5, array.length());
}
@SuppressWarnings("deprecation")
@Test
public void testSingleContainer() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);

View File

@ -199,8 +199,8 @@ public class TestYarnServerApiClasses {
original.setResponseId(1);
NodeStatusPBImpl copy = new NodeStatusPBImpl(original.getProto());
assertEquals(3, copy.getContainersStatuses().get(1).getContainerId()
.getId());
assertEquals(3L, copy.getContainersStatuses().get(1).getContainerId()
.getContainerId());
assertEquals(3, copy.getKeepAliveApplications().get(0).getId());
assertEquals(1000, copy.getNodeHealthStatus().getLastHealthReportTime());
assertEquals(9090, copy.getNodeId().getPort());

View File

@ -430,7 +430,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
// AM Container only
if (this.retentionPolicy
.equals(ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY)) {
if (containerId.getId() == 1) {
if ((containerId.getContainerId()
& ContainerId.CONTAINER_ID_BITMASK)== 1) {
return true;
}
return false;
@ -439,7 +440,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
// AM + Failing containers
if (this.retentionPolicy
.equals(ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY)) {
if (containerId.getId() == 1) {
if ((containerId.getContainerId()
& ContainerId.CONTAINER_ID_BITMASK) == 1) {
return true;
} else if(!wasContainerSuccessful) {
return true;

View File

@ -665,13 +665,13 @@ public class TestContainerManager extends BaseContainerManagerTest {
Assert.assertEquals(5, response.getSuccessfullyStartedContainers().size());
for (ContainerId id : response.getSuccessfullyStartedContainers()) {
// Containers with odd id should succeed.
Assert.assertEquals(1, id.getId() & 1);
Assert.assertEquals(1, id.getContainerId() & 1);
}
Assert.assertEquals(5, response.getFailedRequests().size());
for (Map.Entry<ContainerId, SerializedException> entry : response
.getFailedRequests().entrySet()) {
// Containers with even id should fail.
Assert.assertEquals(0, entry.getKey().getId() & 1);
Assert.assertEquals(0, entry.getKey().getContainerId() & 1);
Assert.assertTrue(entry.getValue().getMessage()
.contains(
"Container " + entry.getKey() + " rejected as it is allocated by a previous RM"));
@ -718,13 +718,13 @@ public class TestContainerManager extends BaseContainerManagerTest {
Assert.assertEquals(5, statusResponse.getContainerStatuses().size());
for (ContainerStatus status : statusResponse.getContainerStatuses()) {
// Containers with odd id should succeed
Assert.assertEquals(1, status.getContainerId().getId() & 1);
Assert.assertEquals(1, status.getContainerId().getContainerId() & 1);
}
Assert.assertEquals(5, statusResponse.getFailedRequests().size());
for (Map.Entry<ContainerId, SerializedException> entry : statusResponse
.getFailedRequests().entrySet()) {
// Containers with even id should fail.
Assert.assertEquals(0, entry.getKey().getId() & 1);
Assert.assertEquals(0, entry.getKey().getContainerId() & 1);
Assert.assertTrue(entry.getValue().getMessage()
.contains("Reject this container"));
}
@ -738,13 +738,13 @@ public class TestContainerManager extends BaseContainerManagerTest {
.size());
for (ContainerId id : stopResponse.getSuccessfullyStoppedContainers()) {
// Containers with odd id should succeed.
Assert.assertEquals(1, id.getId() & 1);
Assert.assertEquals(1, id.getContainerId() & 1);
}
Assert.assertEquals(5, stopResponse.getFailedRequests().size());
for (Map.Entry<ContainerId, SerializedException> entry : stopResponse
.getFailedRequests().entrySet()) {
// Containers with even id should fail.
Assert.assertEquals(0, entry.getKey().getId() & 1);
Assert.assertEquals(0, entry.getKey().getContainerId() & 1);
Assert.assertTrue(entry.getValue().getMessage()
.contains("Reject this container"));
}

View File

@ -49,7 +49,7 @@ import org.mockito.ArgumentMatcher;
public class TestLocalizedResource {
static ContainerId getMockContainer(int id) {
static ContainerId getMockContainer(long id) {
ApplicationId appId = mock(ApplicationId.class);
when(appId.getClusterTimestamp()).thenReturn(314159265L);
when(appId.getId()).thenReturn(3);
@ -57,7 +57,7 @@ public class TestLocalizedResource {
when(appAttemptId.getApplicationId()).thenReturn(appId);
when(appAttemptId.getAttemptId()).thenReturn(0);
ContainerId container = mock(ContainerId.class);
when(container.getId()).thenReturn(id);
when(container.getContainerId()).thenReturn(id);
when(container.getApplicationAttemptId()).thenReturn(appAttemptId);
return container;
}
@ -77,7 +77,7 @@ public class TestLocalizedResource {
// mock resource
LocalResource apiRsrc = createMockResource();
final ContainerId container0 = getMockContainer(0);
final ContainerId container0 = getMockContainer(0L);
final Credentials creds0 = new Credentials();
final LocalResourceVisibility vis0 = LocalResourceVisibility.PRIVATE;
final LocalizerContext ctxt0 =
@ -96,7 +96,7 @@ public class TestLocalizedResource {
// Register C1, verify request event
final Credentials creds1 = new Credentials();
final ContainerId container1 = getMockContainer(1);
final ContainerId container1 = getMockContainer(1L);
final LocalizerContext ctxt1 =
new LocalizerContext("yak", container1, creds1);
final LocalResourceVisibility vis1 = LocalResourceVisibility.PUBLIC;
@ -120,13 +120,13 @@ public class TestLocalizedResource {
assertEquals(ResourceState.DOWNLOADING, local.getState());
// Register C2, C3
final ContainerId container2 = getMockContainer(2);
final ContainerId container2 = getMockContainer(2L);
final LocalResourceVisibility vis2 = LocalResourceVisibility.PRIVATE;
final Credentials creds2 = new Credentials();
final LocalizerContext ctxt2 =
new LocalizerContext("yak", container2, creds2);
final ContainerId container3 = getMockContainer(3);
final ContainerId container3 = getMockContainer(3L);
final LocalResourceVisibility vis3 = LocalResourceVisibility.PRIVATE;
final Credentials creds3 = new Credentials();
final LocalizerContext ctxt3 =
@ -159,7 +159,7 @@ public class TestLocalizedResource {
assertEquals(ResourceState.LOCALIZED, local.getState());
// Register C4, verify notification
final ContainerId container4 = getMockContainer(4);
final ContainerId container4 = getMockContainer(4L);
final Credentials creds4 = new Credentials();
final LocalizerContext ctxt4 =
new LocalizerContext("yak", container4, creds4);

View File

@ -134,7 +134,7 @@ public class MockNM {
}
public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
int containerId, ContainerState containerState) throws Exception {
long containerId, ContainerState containerState) throws Exception {
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
new HashMap<ApplicationId, List<ContainerStatus>>(1);
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(

View File

@ -169,7 +169,7 @@ public class TestContainerResourceUsage {
// launch the 2nd and 3rd containers.
for (Container c : conts) {
nm.nodeHeartbeat(attempt0.getAppAttemptId(),
c.getId().getId(), ContainerState.RUNNING);
c.getId().getContainerId(), ContainerState.RUNNING);
rm0.waitForState(nm, c.getId(), RMContainerState.RUNNING);
}
@ -185,9 +185,9 @@ public class TestContainerResourceUsage {
// Stop all non-AM containers
for (Container c : conts) {
if (c.getId().getId() == 1) continue;
if (c.getId().getContainerId() == 1) continue;
nm.nodeHeartbeat(attempt0.getAppAttemptId(),
c.getId().getId(), ContainerState.COMPLETE);
c.getId().getContainerId(), ContainerState.COMPLETE);
rm0.waitForState(nm, c.getId(), RMContainerState.COMPLETED);
}
@ -198,7 +198,7 @@ public class TestContainerResourceUsage {
// that all containers will complete prior to saving.
ContainerId cId = ContainerId.newInstance(attempt0.getAppAttemptId(), 1);
nm.nodeHeartbeat(attempt0.getAppAttemptId(),
cId.getId(), ContainerState.COMPLETE);
cId.getContainerId(), ContainerState.COMPLETE);
rm0.waitForState(nm, cId, RMContainerState.COMPLETED);
// Check that the container metrics match those from the app usage report.
@ -280,7 +280,7 @@ public class TestContainerResourceUsage {
ContainerId containerId2 =
ContainerId.newInstance(am0.getApplicationAttemptId(), 2);
nm.nodeHeartbeat(am0.getApplicationAttemptId(),
containerId2.getId(), ContainerState.RUNNING);
containerId2.getContainerId(), ContainerState.RUNNING);
rm.waitForState(nm, containerId2, RMContainerState.RUNNING);
// Capture the containers here so the metrics can be calculated after the
@ -295,7 +295,7 @@ public class TestContainerResourceUsage {
ContainerId amContainerId =
app.getCurrentAppAttempt().getMasterContainer().getId();
nm.nodeHeartbeat(am0.getApplicationAttemptId(),
amContainerId.getId(), ContainerState.COMPLETE);
amContainerId.getContainerId(), ContainerState.COMPLETE);
am0.waitForState(RMAppAttemptState.FAILED);
long memorySeconds = 0;
@ -365,7 +365,7 @@ public class TestContainerResourceUsage {
// earlier attempt's attemptId
amContainerId = app.getCurrentAppAttempt().getMasterContainer().getId();
nm.nodeHeartbeat(am0.getApplicationAttemptId(),
amContainerId.getId(), ContainerState.COMPLETE);
amContainerId.getContainerId(), ContainerState.COMPLETE);
MockRM.finishAMAndVerifyAppState(app, rm, nm, am1);

View File

@ -214,7 +214,8 @@ public class TestRM extends ParameterizedSchedulerTestBase {
nm1.nodeHeartbeat(true);
MockAM am = MockRM.launchAM(app, rm, nm1);
// am container Id not equal to 1.
Assert.assertTrue(attempt.getMasterContainer().getId().getId() != 1);
Assert.assertTrue(
attempt.getMasterContainer().getId().getContainerId() != 1);
// NMSecretManager doesn't record the node on which the am is allocated.
Assert.assertFalse(rm.getRMContext().getNMTokenSecretManager()
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
@ -382,11 +383,13 @@ public class TestRM extends ParameterizedSchedulerTestBase {
am.unregisterAppAttempt();
// marking all the containers as finished.
for (Container container : containersReceivedForNM1) {
nm1.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
nm1.nodeHeartbeat(attempt.getAppAttemptId(),
container.getId().getContainerId(),
ContainerState.COMPLETE);
}
for (Container container : containersReceivedForNM2) {
nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
nm2.nodeHeartbeat(attempt.getAppAttemptId(),
container.getId().getContainerId(),
ContainerState.COMPLETE);
}
nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1,

View File

@ -168,6 +168,7 @@ public class TestUtils {
return node;
}
@SuppressWarnings("deprecation")
public static ContainerId getMockContainerId(FiCaSchedulerApp application) {
ContainerId containerId = mock(ContainerId.class);
doReturn(application.getApplicationAttemptId()).