MAPREDUCE-2965. Streamlined the methods hashCode(), equals(), compareTo() and toString() for all IDs. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1172212 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-18 07:51:11 +00:00
parent 61900651b1
commit 34d3936bcd
18 changed files with 726 additions and 269 deletions

View File

@ -301,6 +301,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2672. MR-279: JobHistory Server needs Analysis this job.
(Robert Evans via mahadev)
MAPREDUCE-2965. Streamlined the methods hashCode(), equals(), compareTo()
and toString() for all IDs. (Siddharth Seth via vinodkv)
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -18,13 +18,95 @@
package org.apache.hadoop.mapreduce.v2.api.records;
import java.text.NumberFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public interface JobId {
/**
* <p><code>JobId</code> represents the <em>globally unique</em>
* identifier for a MapReduce job.</p>
*
* <p>The globally unique nature of the identifier is achieved by using the
* <em>cluster timestamp</em> from the associated ApplicationId. i.e.
* start-time of the <code>ResourceManager</code> along with a monotonically
* increasing counter for the jobId.</p>
*/
public abstract class JobId implements Comparable<JobId> {
/**
* Get the associated <em>ApplicationId</em> which represents the
* start time of the <code>ResourceManager</code> and is used to generate
* the globally unique <code>JobId</code>.
* @return associated <code>ApplicationId</code>
*/
public abstract ApplicationId getAppId();
/**
* Get the short integer identifier of the <code>JobId</code>
* which is unique for all applications started by a particular instance
* of the <code>ResourceManager</code>.
* @return short integer identifier of the <code>JobId</code>
*/
public abstract int getId();
public abstract void setAppId(ApplicationId appId);
public abstract void setId(int id);
protected static final String JOB = "job";
protected static final char SEPARATOR = '_';
static final ThreadLocal<NumberFormat> jobIdFormat =
new ThreadLocal<NumberFormat>() {
@Override
public NumberFormat initialValue() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setGroupingUsed(false);
fmt.setMinimumIntegerDigits(4);
return fmt;
}
};
@Override
public String toString() {
StringBuilder builder = new StringBuilder(JOB);
builder.append(SEPARATOR);
builder.append(getAppId().getClusterTimestamp());
builder.append(SEPARATOR);
builder.append(jobIdFormat.get().format(getId()));
return builder.toString();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + getAppId().hashCode();
result = prime * result + getId();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
JobId other = (JobId) obj;
if (!this.getAppId().equals(other.getAppId()))
return false;
if (this.getId() != other.getId())
return false;
return true;
}
@Override
public int compareTo(JobId other) {
int appIdComp = this.getAppId().compareTo(other.getAppId());
if (appIdComp == 0) {
return this.getId() - other.getId();
} else {
return appIdComp;
}
}
}

View File

@ -18,10 +18,87 @@
package org.apache.hadoop.mapreduce.v2.api.records;
public interface TaskAttemptId {
/**
* <p>
* <code>TaskAttemptId</code> represents the unique identifier for a task
* attempt. Each task attempt is one particular instance of a Map or Reduce Task
* identified by its TaskId.
* </p>
*
* <p>
* TaskAttemptId consists of 2 parts. First part is the <code>TaskId</code>,
* that this <code>TaskAttemptId</code> belongs to. Second part is the task
* attempt number.
* </p>
*/
public abstract class TaskAttemptId implements Comparable<TaskAttemptId> {
/**
* @return the associated TaskId.
*/
public abstract TaskId getTaskId();
/**
* @return the attempt id.
*/
public abstract int getId();
public abstract void setTaskId(TaskId taskId);
public abstract void setId(int id);
protected static final String TASKATTEMPT = "attempt";
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + getId();
result =
prime * result + ((getTaskId() == null) ? 0 : getTaskId().hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TaskAttemptId other = (TaskAttemptId) obj;
if (getId() != other.getId())
return false;
if (!getTaskId().equals(other.getTaskId()))
return false;
return true;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(TASKATTEMPT);
TaskId taskId = getTaskId();
builder.append("_").append(
taskId.getJobId().getAppId().getClusterTimestamp());
builder.append("_").append(
JobId.jobIdFormat.get().format(
getTaskId().getJobId().getAppId().getId()));
builder.append("_");
builder.append(taskId.getTaskType() == TaskType.MAP ? "m" : "r");
builder.append("_")
.append(TaskId.taskIdFormat.get().format(taskId.getId()));
builder.append("_");
builder.append(getId());
return builder.toString();
}
@Override
public int compareTo(TaskAttemptId other) {
int taskIdComp = this.getTaskId().compareTo(other.getTaskId());
if (taskIdComp == 0) {
return this.getId() - other.getId();
} else {
return taskIdComp;
}
}
}

View File

@ -18,12 +18,109 @@
package org.apache.hadoop.mapreduce.v2.api.records;
public interface TaskId {
import java.text.NumberFormat;
/**
* <p>
* <code>TaskId</code> represents the unique identifier for a Map or Reduce
* Task.
* </p>
*
* <p>
* TaskId consists of 3 parts. First part is <code>JobId</code>, that this Task
* belongs to. Second part of the TaskId is either 'm' or 'r' representing
* whether the task is a map task or a reduce task. And the third part is the
* task number.
* </p>
*/
public abstract class TaskId implements Comparable<TaskId> {
/**
* @return the associated <code>JobId</code>
*/
public abstract JobId getJobId();
/**
* @return the type of the task - MAP/REDUCE
*/
public abstract TaskType getTaskType();
/**
* @return the task number.
*/
public abstract int getId();
public abstract void setJobId(JobId jobId);
public abstract void setTaskType(TaskType taskType);
public abstract void setId(int id);
protected static final String TASK = "task";
static final ThreadLocal<NumberFormat> taskIdFormat =
new ThreadLocal<NumberFormat>() {
@Override
public NumberFormat initialValue() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setGroupingUsed(false);
fmt.setMinimumIntegerDigits(6);
return fmt;
}
};
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + getId();
result = prime * result + getJobId().hashCode();
result = prime * result + getTaskType().hashCode();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TaskId other = (TaskId) obj;
if (getId() != other.getId())
return false;
if (!getJobId().equals(other.getJobId()))
return false;
if (getTaskType() != other.getTaskType())
return false;
return true;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(TASK);
JobId jobId = getJobId();
builder.append("_").append(jobId.getAppId().getClusterTimestamp());
builder.append("_").append(
JobId.jobIdFormat.get().format(jobId.getAppId().getId()));
builder.append("_");
builder.append(getTaskType() == TaskType.MAP ? "m" : "r").append("_");
builder.append(taskIdFormat.get().format(getId()));
return builder.toString();
}
@Override
public int compareTo(TaskId other) {
int jobIdComp = this.getJobId().compareTo(other.getJobId());
if (jobIdComp == 0) {
if (this.getTaskType() == other.getTaskType()) {
return this.getId() - other.getId();
} else {
return this.getTaskType().compareTo(other.getTaskType());
}
} else {
return jobIdComp;
}
}
}

View File

@ -18,34 +18,20 @@
package org.apache.hadoop.mapreduce.v2.api.records.impl.pb;
import java.text.NumberFormat;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobIdProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobIdProtoOrBuilder;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
public class JobIdPBImpl extends ProtoBase<JobIdProto> implements JobId {
protected static final String JOB = "job";
protected static final char SEPARATOR = '_';
protected static final NumberFormat idFormat = NumberFormat.getInstance();
static {
idFormat.setGroupingUsed(false);
idFormat.setMinimumIntegerDigits(4);
}
public class JobIdPBImpl extends JobId {
JobIdProto proto = JobIdProto.getDefaultInstance();
JobIdProto.Builder builder = null;
boolean viaProto = false;
private ApplicationId applicationId = null;
// boolean hasLocalAppId = false;
public JobIdPBImpl() {
builder = JobIdProto.newBuilder();
@ -56,9 +42,7 @@ public class JobIdPBImpl extends ProtoBase<JobIdProto> implements JobId {
viaProto = true;
}
@Override
public synchronized JobIdProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
@ -66,7 +50,9 @@ public class JobIdPBImpl extends ProtoBase<JobIdProto> implements JobId {
}
private synchronized void mergeLocalToBuilder() {
if (this.applicationId != null && !((ApplicationIdPBImpl)this.applicationId).getProto().equals(builder.getAppId())) {
if (this.applicationId != null
&& !((ApplicationIdPBImpl) this.applicationId).getProto().equals(
builder.getAppId())) {
builder.setAppId(convertToProtoFormat(this.applicationId));
}
}
@ -107,7 +93,6 @@ public class JobIdPBImpl extends ProtoBase<JobIdProto> implements JobId {
builder.clearAppId();
}
this.applicationId = appId;
// builder.setAppId(convertToProtoFormat(appId));
}
@Override
public synchronized int getId() {
@ -121,21 +106,12 @@ public class JobIdPBImpl extends ProtoBase<JobIdProto> implements JobId {
builder.setId((id));
}
private synchronized ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
private ApplicationIdPBImpl convertFromProtoFormat(
ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private synchronized ApplicationIdProto convertToProtoFormat(ApplicationId t) {
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(JOB);
builder.append(SEPARATOR);
builder.append(getAppId().getClusterTimestamp());
builder.append(SEPARATOR);
builder.append(idFormat.format(getId()));
return builder.toString();
}
}

View File

@ -18,36 +18,19 @@
package org.apache.hadoop.mapreduce.v2.api.records.impl.pb;
import java.text.NumberFormat;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptIdProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptIdProtoOrBuilder;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskIdProto;
import org.apache.hadoop.yarn.api.records.ProtoBase;
public class TaskAttemptIdPBImpl extends ProtoBase<TaskAttemptIdProto> implements TaskAttemptId {
public class TaskAttemptIdPBImpl extends TaskAttemptId {
TaskAttemptIdProto proto = TaskAttemptIdProto.getDefaultInstance();
TaskAttemptIdProto.Builder builder = null;
boolean viaProto = false;
private TaskId taskId = null;
protected static final NumberFormat idFormat = NumberFormat.getInstance();
static {
idFormat.setGroupingUsed(false);
idFormat.setMinimumIntegerDigits(6);
}
protected static final NumberFormat jobidFormat = NumberFormat.getInstance();
static {
jobidFormat.setGroupingUsed(false);
jobidFormat.setMinimumIntegerDigits(4);
}
public TaskAttemptIdPBImpl() {
@ -59,20 +42,21 @@ public class TaskAttemptIdPBImpl extends ProtoBase<TaskAttemptIdProto> implement
viaProto = true;
}
public TaskAttemptIdProto getProto() {
public synchronized TaskAttemptIdProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.taskId != null && !((TaskIdPBImpl)this.taskId).getProto().equals(builder.getTaskId())) {
private synchronized void mergeLocalToBuilder() {
if (this.taskId != null
&& !((TaskIdPBImpl) this.taskId).getProto().equals(builder.getTaskId())) {
builder.setTaskId(convertToProtoFormat(this.taskId));
}
}
private void mergeLocalToProto() {
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
@ -80,7 +64,7 @@ public class TaskAttemptIdPBImpl extends ProtoBase<TaskAttemptIdProto> implement
viaProto = true;
}
private void maybeInitBuilder() {
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = TaskAttemptIdProto.newBuilder(proto);
}
@ -89,18 +73,18 @@ public class TaskAttemptIdPBImpl extends ProtoBase<TaskAttemptIdProto> implement
@Override
public int getId() {
public synchronized int getId() {
TaskAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
return (p.getId());
}
@Override
public void setId(int id) {
public synchronized void setId(int id) {
maybeInitBuilder();
builder.setId((id));
}
@Override
public TaskId getTaskId() {
public synchronized TaskId getTaskId() {
TaskAttemptIdProtoOrBuilder p = viaProto ? proto : builder;
if (this.taskId != null) {
return this.taskId;
@ -113,7 +97,7 @@ public class TaskAttemptIdPBImpl extends ProtoBase<TaskAttemptIdProto> implement
}
@Override
public void setTaskId(TaskId taskId) {
public synchronized void setTaskId(TaskId taskId) {
maybeInitBuilder();
if (taskId == null)
builder.clearTaskId();
@ -127,16 +111,4 @@ public class TaskAttemptIdPBImpl extends ProtoBase<TaskAttemptIdProto> implement
private TaskIdProto convertToProtoFormat(TaskId t) {
return ((TaskIdPBImpl)t).getProto();
}
@Override
public String toString() {
String identifier = (getTaskId() == null) ? "none":
getTaskId().getJobId().getAppId().getClusterTimestamp() + "_" +
jobidFormat.format(getTaskId().getJobId().getAppId().getId()) + "_" +
((getTaskId().getTaskType() == TaskType.MAP) ? "m" : "r") + "_" +
idFormat.format(getTaskId().getId()) + "_" +
getId();
return "attempt_" + identifier;
}
}

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.mapreduce.v2.api.records.impl.pb;
import java.text.NumberFormat;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@ -29,30 +26,14 @@ import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskIdProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskIdProtoOrBuilder;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskTypeProto;
import org.apache.hadoop.mapreduce.v2.util.MRProtoUtils;
import org.apache.hadoop.yarn.api.records.ProtoBase;
public class TaskIdPBImpl extends ProtoBase<TaskIdProto> implements TaskId {
public class TaskIdPBImpl extends TaskId {
TaskIdProto proto = TaskIdProto.getDefaultInstance();
TaskIdProto.Builder builder = null;
boolean viaProto = false;
protected static final NumberFormat idFormat = NumberFormat.getInstance();
static {
idFormat.setGroupingUsed(false);
idFormat.setMinimumIntegerDigits(6);
}
protected static final NumberFormat jobidFormat = NumberFormat.getInstance();
static {
jobidFormat.setGroupingUsed(false);
jobidFormat.setMinimumIntegerDigits(4);
}
private JobId jobId = null;
public TaskIdPBImpl() {
builder = TaskIdProto.newBuilder(proto);
}
@ -70,7 +51,8 @@ public class TaskIdPBImpl extends ProtoBase<TaskIdProto> implements TaskId {
}
private synchronized void mergeLocalToBuilder() {
if (this.jobId != null && !((JobIdPBImpl)this.jobId).getProto().equals(builder.getJobId()) ) {
if (this.jobId != null
&& !((JobIdPBImpl) this.jobId).getProto().equals(builder.getJobId())) {
builder.setJobId(convertToProtoFormat(this.jobId));
}
}
@ -90,7 +72,6 @@ public class TaskIdPBImpl extends ProtoBase<TaskIdProto> implements TaskId {
viaProto = false;
}
@Override
public synchronized int getId() {
TaskIdProtoOrBuilder p = viaProto ? proto : builder;
@ -102,6 +83,7 @@ public class TaskIdPBImpl extends ProtoBase<TaskIdProto> implements TaskId {
maybeInitBuilder();
builder.setId((id));
}
@Override
public synchronized JobId getJobId() {
TaskIdProtoOrBuilder p = viaProto ? proto : builder;
@ -122,6 +104,7 @@ public class TaskIdPBImpl extends ProtoBase<TaskIdProto> implements TaskId {
builder.clearJobId();
this.jobId = jobId;
}
@Override
public synchronized TaskType getTaskType() {
TaskIdProtoOrBuilder p = viaProto ? proto : builder;
@ -141,29 +124,19 @@ public class TaskIdPBImpl extends ProtoBase<TaskIdProto> implements TaskId {
builder.setTaskType(convertToProtoFormat(taskType));
}
private synchronized JobIdPBImpl convertFromProtoFormat(JobIdProto p) {
private JobIdPBImpl convertFromProtoFormat(JobIdProto p) {
return new JobIdPBImpl(p);
}
private synchronized JobIdProto convertToProtoFormat(JobId t) {
private JobIdProto convertToProtoFormat(JobId t) {
return ((JobIdPBImpl)t).getProto();
}
private synchronized TaskTypeProto convertToProtoFormat(TaskType e) {
private TaskTypeProto convertToProtoFormat(TaskType e) {
return MRProtoUtils.convertToProtoFormat(e);
}
private synchronized TaskType convertFromProtoFormat(TaskTypeProto e) {
private TaskType convertFromProtoFormat(TaskTypeProto e) {
return MRProtoUtils.convertFromProtoFormat(e);
}
@Override
public synchronized String toString() {
String jobIdentifier = (jobId == null) ? "none":
jobId.getAppId().getClusterTimestamp() + "_" +
jobidFormat.format(jobId.getAppId().getId()) + "_" +
((getTaskType() == TaskType.MAP) ? "m":"r") + "_" + idFormat.format(getId());
return "task_" + jobIdentifier;
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.util;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class MRBuilderUtils {
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
public static JobId newJobId(ApplicationId appId, int id) {
JobId jobId = recordFactory.newRecordInstance(JobId.class);
jobId.setAppId(appId);
jobId.setId(id);
return jobId;
}
public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) {
TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
taskId.setJobId(jobId);
taskId.setId(id);
taskId.setTaskType(taskType);
return taskId;
}
public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) {
TaskAttemptId taskAttemptId =
recordFactory.newRecordInstance(TaskAttemptId.class);
taskAttemptId.setTaskId(taskId);
taskAttemptId.setId(attemptId);
return taskAttemptId;
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.records;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestIds {
@Test
public void testJobId() {
long ts1 = 1315890136000l;
long ts2 = 1315890136001l;
JobId j1 = createJobId(ts1, 2);
JobId j2 = createJobId(ts1, 1);
JobId j3 = createJobId(ts2, 1);
JobId j4 = createJobId(ts1, 2);
assertTrue(j1.equals(j4));
assertFalse(j1.equals(j2));
assertFalse(j1.equals(j3));
assertTrue(j1.compareTo(j4) == 0);
assertTrue(j1.compareTo(j2) > 0);
assertTrue(j1.compareTo(j3) < 0);
assertTrue(j1.hashCode() == j4.hashCode());
assertFalse(j1.hashCode() == j2.hashCode());
assertFalse(j1.hashCode() == j3.hashCode());
JobId j5 = createJobId(ts1, 231415);
assertEquals("job_" + ts1 + "_0002", j1.toString());
assertEquals("job_" + ts1 + "_231415", j5.toString());
}
@Test
public void testTaskId() {
long ts1 = 1315890136000l;
long ts2 = 1315890136001l;
TaskId t1 = createTaskId(ts1, 1, 2, TaskType.MAP);
TaskId t2 = createTaskId(ts1, 1, 2, TaskType.REDUCE);
TaskId t3 = createTaskId(ts1, 1, 1, TaskType.MAP);
TaskId t4 = createTaskId(ts1, 1, 2, TaskType.MAP);
TaskId t5 = createTaskId(ts2, 1, 1, TaskType.MAP);
assertTrue(t1.equals(t4));
assertFalse(t1.equals(t2));
assertFalse(t1.equals(t3));
assertFalse(t1.equals(t5));
assertTrue(t1.compareTo(t4) == 0);
assertTrue(t1.compareTo(t2) < 0);
assertTrue(t1.compareTo(t3) > 0);
assertTrue(t1.compareTo(t5) < 0);
assertTrue(t1.hashCode() == t4.hashCode());
assertFalse(t1.hashCode() == t2.hashCode());
assertFalse(t1.hashCode() == t3.hashCode());
assertFalse(t1.hashCode() == t5.hashCode());
TaskId t6 = createTaskId(ts1, 324151, 54643747, TaskType.REDUCE);
assertEquals("task_" + ts1 + "_0001_m_000002", t1.toString());
assertEquals("task_" + ts1 + "_324151_r_54643747", t6.toString());
}
@Test
public void testTaskAttemptId() {
long ts1 = 1315890136000l;
long ts2 = 1315890136001l;
TaskAttemptId t1 = createTaskAttemptId(ts1, 2, 2, TaskType.MAP, 2);
TaskAttemptId t2 = createTaskAttemptId(ts1, 2, 2, TaskType.REDUCE, 2);
TaskAttemptId t3 = createTaskAttemptId(ts1, 2, 2, TaskType.MAP, 3);
TaskAttemptId t4 = createTaskAttemptId(ts1, 2, 2, TaskType.MAP, 1);
TaskAttemptId t5 = createTaskAttemptId(ts1, 2, 1, TaskType.MAP, 3);
TaskAttemptId t6 = createTaskAttemptId(ts1, 2, 2, TaskType.MAP, 2);
assertTrue(t1.equals(t6));
assertFalse(t1.equals(t2));
assertFalse(t1.equals(t3));
assertFalse(t1.equals(t5));
assertTrue(t1.compareTo(t6) == 0);
assertTrue(t1.compareTo(t2) < 0);
assertTrue(t1.compareTo(t3) < 0);
assertTrue(t1.compareTo(t4) > 0);
assertTrue(t1.compareTo(t5) > 0);
assertTrue(t1.hashCode() == t6.hashCode());
assertFalse(t1.hashCode() == t2.hashCode());
assertFalse(t1.hashCode() == t3.hashCode());
assertFalse(t1.hashCode() == t5.hashCode());
TaskAttemptId t7 =
createTaskAttemptId(ts2, 5463346, 4326575, TaskType.REDUCE, 54375);
assertEquals("attempt_" + ts1 + "_0002_m_000002_2", t1.toString());
assertEquals("attempt_" + ts2 + "_5463346_r_4326575_54375", t7.toString());
}
private JobId createJobId(long clusterTimestamp, int idInt) {
return MRBuilderUtils.newJobId(
BuilderUtils.newApplicationId(clusterTimestamp, idInt), idInt);
}
private TaskId createTaskId(long clusterTimestamp, int jobIdInt,
int taskIdInt, TaskType taskType) {
return MRBuilderUtils.newTaskId(createJobId(clusterTimestamp, jobIdInt),
taskIdInt, taskType);
}
private TaskAttemptId createTaskAttemptId(long clusterTimestamp,
int jobIdInt, int taskIdInt, TaskType taskType, int taskAttemptIdInt) {
return MRBuilderUtils.newTaskAttemptId(
createTaskId(clusterTimestamp, jobIdInt, taskIdInt, taskType),
taskAttemptIdInt);
}
}

View File

@ -38,6 +38,9 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
public abstract class ApplicationAttemptId implements
Comparable<ApplicationAttemptId> {
int id;
ApplicationId appId;
/**
* Get the <code>ApplicationId</code> of the <code>ApplicationAttempId</code>.
* @return <code>ApplicationId</code> of the <code>ApplicationAttempId</code>
@ -60,20 +63,16 @@ public abstract class ApplicationAttemptId implements
@Unstable
public abstract void setAttemptId(int attemptId);
protected static final NumberFormat idFormat = NumberFormat.getInstance();
static {
idFormat.setGroupingUsed(false);
idFormat.setMinimumIntegerDigits(4);
}
protected static final NumberFormat counterFormat = NumberFormat
.getInstance();
static {
counterFormat.setGroupingUsed(false);
counterFormat.setMinimumIntegerDigits(6);
static final ThreadLocal<NumberFormat> attemptIdFormat =
new ThreadLocal<NumberFormat>() {
@Override
public NumberFormat initialValue() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setGroupingUsed(false);
fmt.setMinimumIntegerDigits(6);
return fmt;
}
};
@Override
public int hashCode() {
@ -81,22 +80,25 @@ public abstract class ApplicationAttemptId implements
final int prime = 31;
int result = 1;
ApplicationId appId = getApplicationId();
result = prime * result + ((appId == null) ? 0 : appId.hashCode());
result = prime * result + appId.hashCode();
result = prime * result + getAttemptId();
return result;
}
@Override
public boolean equals(Object other) {
if (other == null)
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
ApplicationAttemptId otherAttemptId = (ApplicationAttemptId) other;
if (this.getApplicationId().equals(otherAttemptId.getApplicationId())) {
return this.getAttemptId() == otherAttemptId.getAttemptId();
}
}
if (getClass() != obj.getClass())
return false;
ApplicationAttemptId other = (ApplicationAttemptId) obj;
if (!this.getApplicationId().equals(other.getApplicationId()))
return false;
if (this.getAttemptId() != other.getAttemptId())
return false;
return true;
}
@Override
@ -112,11 +114,11 @@ public abstract class ApplicationAttemptId implements
@Override
public String toString() {
String id =
(this.getApplicationId() != null) ? this.getApplicationId()
.getClusterTimestamp()
+ "_"
+ idFormat.format(this.getApplicationId().getId()) : "none";
return "appattempt_" + id + "_" + counterFormat.format(getAttemptId());
StringBuilder sb = new StringBuilder("appattempt_");
sb.append(this.getApplicationId().getClusterTimestamp()).append("_");
sb.append(ApplicationId.appIdFormat.get().format(
this.getApplicationId().getId()));
sb.append("_").append(attemptIdFormat.get().format(getAttemptId()));
return sb.toString();
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.api.records;
import java.text.NumberFormat;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@ -62,6 +64,18 @@ public abstract class ApplicationId implements Comparable<ApplicationId> {
public abstract void setClusterTimestamp(long clusterTimestamp);
static final ThreadLocal<NumberFormat> appIdFormat =
new ThreadLocal<NumberFormat>() {
@Override
public NumberFormat initialValue() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setGroupingUsed(false);
fmt.setMinimumIntegerDigits(4);
return fmt;
}
};
@Override
public int compareTo(ApplicationId other) {
if (this.getClusterTimestamp() - other.getClusterTimestamp() == 0) {
@ -74,7 +88,8 @@ public abstract class ApplicationId implements Comparable<ApplicationId> {
@Override
public String toString() {
return "application_" + this.getClusterTimestamp() + "_" + this.getId();
return "application_" + this.getClusterTimestamp() + "_"
+ appIdFormat.get().format(getId());
}
@Override
@ -90,15 +105,18 @@ public abstract class ApplicationId implements Comparable<ApplicationId> {
}
@Override
public boolean equals(Object other) {
if (other == null) return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
ApplicationId otherAppId = (ApplicationId)other;
if (this.getClusterTimestamp() == otherAppId.getClusterTimestamp() &&
this.getId() == otherAppId.getId()) {
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ApplicationId other = (ApplicationId) obj;
if (this.getClusterTimestamp() != other.getClusterTimestamp())
return false;
if (this.getId() != other.getId())
return false;
return true;
}
}
return false;
}
}

View File

@ -59,19 +59,6 @@ public abstract class ContainerId implements Comparable<ContainerId>{
public abstract void setId(int id);
// TODO: Why thread local?
// ^ NumberFormat instances are not threadsafe
private static final ThreadLocal<NumberFormat> appIdFormat =
new ThreadLocal<NumberFormat>() {
@Override
public NumberFormat initialValue() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setGroupingUsed(false);
fmt.setMinimumIntegerDigits(4);
return fmt;
}
};
// TODO: fail the app submission if attempts are more than 10 or something
private static final ThreadLocal<NumberFormat> appAttemptIdFormat =
new ThreadLocal<NumberFormat>() {
@ -102,24 +89,24 @@ public abstract class ContainerId implements Comparable<ContainerId>{
final int prime = 31;
int result = 1;
result = prime * result + getId();
result = prime * result
+ ((getApplicationAttemptId() == null) ? 0 : getApplicationAttemptId().hashCode());
result = prime * result + getApplicationAttemptId().hashCode();
return result;
}
@Override
public boolean equals(Object other) {
if (other == null) {
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
ContainerId otherCId = (ContainerId)other;
if (this.getApplicationAttemptId().equals(
otherCId.getApplicationAttemptId())) {
return this.getId() == otherCId.getId();
}
}
if (getClass() != obj.getClass())
return false;
ContainerId other = (ContainerId) obj;
if (!this.getApplicationAttemptId().equals(other.getApplicationAttemptId()))
return false;
if (this.getId() != other.getId())
return false;
return true;
}
@Override
@ -137,11 +124,14 @@ public abstract class ContainerId implements Comparable<ContainerId>{
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("container_");
ApplicationId appId = getApplicationAttemptId().getApplicationId();
sb.append("container_").append(appId.getClusterTimestamp()).append("_");
sb.append(appIdFormat.get().format(appId.getId())).append("_");
sb.append(appAttemptIdFormat.get().format(getApplicationAttemptId().
getAttemptId())).append("_");
sb.append(appId.getClusterTimestamp()).append("_");
sb.append(ApplicationId.appIdFormat.get().format(appId.getId()))
.append("_");
sb.append(
appAttemptIdFormat.get().format(
getApplicationAttemptId().getAttemptId())).append("_");
sb.append(containerIdFormat.get().format(getId()));
return sb.toString();
}

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
*/
@Public
@Stable
public interface NodeId extends Comparable<NodeId> {
public abstract class NodeId implements Comparable<NodeId> {
/**
* Get the <em>hostname</em> of the node.
@ -40,11 +40,11 @@ public interface NodeId extends Comparable<NodeId> {
*/
@Public
@Stable
String getHost();
public abstract String getHost();
@Private
@Unstable
void setHost(String host);
public abstract void setHost(String host);
/**
* Get the <em>port</em> for communicating with the node.
@ -52,9 +52,54 @@ public interface NodeId extends Comparable<NodeId> {
*/
@Public
@Stable
int getPort();
public abstract int getPort();
@Private
@Unstable
void setPort(int port);
public abstract void setPort(int port);
@Override
public String toString() {
return this.getHost() + ":" + this.getPort();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + this.getHost().hashCode();
result = prime * result + this.getPort();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
NodeId other = (NodeId) obj;
if (!this.getHost().equals(other.getHost()))
return false;
if (this.getPort() != other.getPort())
return false;
return true;
}
@Override
public int compareTo(NodeId other) {
int hostCompare = this.getHost().compareTo(other.getHost());
if (hostCompare == 0) {
if (this.getPort() > other.getPort()) {
return 1;
} else if (this.getPort() < other.getPort()) {
return -1;
}
return 0;
}
return hostCompare;
}
}

View File

@ -25,7 +25,8 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProtoOrBuilde
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
public class ApplicationAttemptIdPBImpl extends ApplicationAttemptId {
ApplicationAttemptIdProto proto = ApplicationAttemptIdProto.getDefaultInstance();
ApplicationAttemptIdProto proto = ApplicationAttemptIdProto
.getDefaultInstance();
ApplicationAttemptIdProto.Builder builder = null;
boolean viaProto = false;
@ -48,7 +49,9 @@ public class ApplicationAttemptIdPBImpl extends ApplicationAttemptId {
}
private synchronized void mergeLocalToBuilder() {
if (this.applicationId != null && !((ApplicationIdPBImpl)applicationId).getProto().equals(builder.getApplicationId())) {
if (this.applicationId != null
&& !((ApplicationIdPBImpl) applicationId).getProto().equals(
builder.getApplicationId())) {
builder.setApplicationId(convertToProtoFormat(this.applicationId));
}
}

View File

@ -20,13 +20,12 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProtoOrBuilder;
public class NodeIdPBImpl extends ProtoBase<NodeIdProto> implements NodeId {
public class NodeIdPBImpl extends NodeId {
NodeIdProto proto = NodeIdProto.getDefaultInstance();
NodeIdProto.Builder builder = null;
boolean viaProto = false;
@ -40,13 +39,13 @@ public class NodeIdPBImpl extends ProtoBase<NodeIdProto> implements NodeId {
viaProto = true;
}
public NodeIdProto getProto() {
public synchronized NodeIdProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = NodeIdProto.newBuilder(proto);
}
@ -54,77 +53,26 @@ public class NodeIdPBImpl extends ProtoBase<NodeIdProto> implements NodeId {
}
@Override
public String getHost() {
public synchronized String getHost() {
NodeIdProtoOrBuilder p = viaProto ? proto : builder;
return (p.getHost());
}
@Override
public void setHost(String host) {
public synchronized void setHost(String host) {
maybeInitBuilder();
builder.setHost((host));
}
@Override
public int getPort() {
public synchronized int getPort() {
NodeIdProtoOrBuilder p = viaProto ? proto : builder;
return (p.getPort());
}
@Override
public void setPort(int port) {
public synchronized void setPort(int port) {
maybeInitBuilder();
builder.setPort((port));
}
@Override
public String toString() {
return this.getHost() + ":" + this.getPort();
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
String host = this.getHost();
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + this.getPort();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
NodeIdPBImpl other = (NodeIdPBImpl) obj;
String host = this.getHost();
String otherHost = other.getHost();
if (host == null) {
if (otherHost != null)
return false;
} else if (!host.equals(otherHost))
return false;
if (this.getPort() != other.getPort())
return false;
return true;
}
@Override
public int compareTo(NodeId other) {
int hostCompare = this.getHost().compareTo(other.getHost());
if (hostCompare == 0) {
if (this.getPort() > other.getPort()) {
return 1;
} else if (this.getPort() < other.getPort()) {
return -1;
}
return 0;
}
return hostCompare;
}
}

View File

@ -1,3 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import junit.framework.Assert;
@ -32,6 +51,10 @@ public class TestApplicationAttemptId {
Assert.assertFalse(a1.hashCode() == a3.hashCode());
Assert.assertFalse(a1.hashCode() == a4.hashCode());
long ts = System.currentTimeMillis();
ApplicationAttemptId a6 = createAppAttemptId(ts, 543627, 33492611);
Assert.assertEquals("appattempt_10_0001_000001", a1.toString());
Assert.assertEquals("appattempt_" + ts + "_543627_33492611", a6.toString());
}
private ApplicationAttemptId createAppAttemptId(long clusterTimeStamp,
@ -45,4 +68,9 @@ public class TestApplicationAttemptId {
appAttemptId.setAttemptId(attemptId);
return appAttemptId;
}
public static void main(String[] args) throws Exception {
TestApplicationAttemptId t = new TestApplicationAttemptId();
t.testApplicationAttemptId();
}
}

View File

@ -1,3 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import junit.framework.Assert;
@ -26,6 +44,11 @@ public class TestApplicationId {
Assert.assertTrue(a1.hashCode() == a3.hashCode());
Assert.assertFalse(a1.hashCode() == a2.hashCode());
Assert.assertFalse(a2.hashCode() == a4.hashCode());
long ts = System.currentTimeMillis();
ApplicationId a5 = createAppId(ts, 45436343);
Assert.assertEquals("application_10_0001", a1.toString());
Assert.assertEquals("application_" + ts + "_45436343", a5.toString());
}
private ApplicationId createAppId(long clusterTimeStamp, int id) {

View File

@ -1,3 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import junit.framework.Assert;
@ -32,6 +51,12 @@ public class TestContainerId {
Assert.assertFalse(c1.hashCode() == c2.hashCode());
Assert.assertFalse(c1.hashCode() == c4.hashCode());
Assert.assertFalse(c1.hashCode() == c5.hashCode());
long ts = System.currentTimeMillis();
ContainerId c6 = createContainerId(ts, 36473, 4365472, 25645811);
Assert.assertEquals("container_10_0001_01_000001", c1.toString());
Assert.assertEquals("container_" + ts + "_36473_4365472_25645811",
c6.toString());
}
private ContainerId createContainerId(long clusterTimestamp, int appIdInt,