MAPREDUCE-2954. Fixed a deadlock in NM caused due to wrong synchronization in protocol buffer records. Contributed by Siddharth Seth.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1167061 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
85e17fd2df
commit
1d84d983a2
|
@ -1256,6 +1256,9 @@ Release 0.23.0 - Unreleased
|
||||||
|
|
||||||
MAPREDUCE-2963. Fix hang in TestMRJobs. (Siddharth Seth via acmurthy)
|
MAPREDUCE-2963. Fix hang in TestMRJobs. (Siddharth Seth via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-2954. Fixed a deadlock in NM caused due to wrong synchronization
|
||||||
|
in protocol buffer records. (Siddharth Seth via vinodkv)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -18,11 +18,73 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records;
|
package org.apache.hadoop.yarn.api.records;
|
||||||
|
|
||||||
public interface ApplicationAttemptId extends Comparable<ApplicationAttemptId>{
|
import java.text.NumberFormat;
|
||||||
|
|
||||||
|
public abstract class ApplicationAttemptId implements
|
||||||
|
Comparable<ApplicationAttemptId> {
|
||||||
public abstract ApplicationId getApplicationId();
|
public abstract ApplicationId getApplicationId();
|
||||||
public abstract int getAttemptId();
|
public abstract int getAttemptId();
|
||||||
|
|
||||||
public abstract void setApplicationId(ApplicationId appID);
|
public abstract void setApplicationId(ApplicationId appID);
|
||||||
public abstract void setAttemptId(int attemptId);
|
public abstract void setAttemptId(int attemptId);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
protected static final NumberFormat idFormat = NumberFormat.getInstance();
|
||||||
|
static {
|
||||||
|
idFormat.setGroupingUsed(false);
|
||||||
|
idFormat.setMinimumIntegerDigits(4);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static final NumberFormat counterFormat = NumberFormat
|
||||||
|
.getInstance();
|
||||||
|
static {
|
||||||
|
counterFormat.setGroupingUsed(false);
|
||||||
|
counterFormat.setMinimumIntegerDigits(6);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
// Generated by eclipse.
|
||||||
|
final int prime = 31;
|
||||||
|
int result = 1;
|
||||||
|
ApplicationId appId = getApplicationId();
|
||||||
|
result = prime * result + ((appId == null) ? 0 : appId.hashCode());
|
||||||
|
result = prime * result + getAttemptId();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other == null)
|
||||||
|
return false;
|
||||||
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
|
ApplicationAttemptId otherAttemptId = (ApplicationAttemptId) other;
|
||||||
|
if (this.getApplicationId().equals(otherAttemptId.getApplicationId())) {
|
||||||
|
return this.getAttemptId() == otherAttemptId.getAttemptId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(ApplicationAttemptId other) {
|
||||||
|
int compareAppIds = this.getApplicationId().compareTo(
|
||||||
|
other.getApplicationId());
|
||||||
|
if (compareAppIds == 0) {
|
||||||
|
return this.getAttemptId() - other.getAttemptId();
|
||||||
|
} else {
|
||||||
|
return compareAppIds;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
String id =
|
||||||
|
(this.getApplicationId() != null) ? this.getApplicationId()
|
||||||
|
.getClusterTimestamp()
|
||||||
|
+ "_"
|
||||||
|
+ idFormat.format(this.getApplicationId().getId()) : "none";
|
||||||
|
return "appattempt_" + id + "_" + counterFormat.format(getAttemptId());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,13 +18,51 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records;
|
package org.apache.hadoop.yarn.api.records;
|
||||||
|
|
||||||
|
public abstract class ApplicationId implements Comparable<ApplicationId> {
|
||||||
|
|
||||||
public interface ApplicationId extends Comparable<ApplicationId> {
|
|
||||||
public abstract int getId();
|
public abstract int getId();
|
||||||
public abstract long getClusterTimestamp();
|
public abstract long getClusterTimestamp();
|
||||||
|
|
||||||
public abstract void setId(int id);
|
public abstract void setId(int id);
|
||||||
public abstract void setClusterTimestamp(long clusterTimestamp);
|
public abstract void setClusterTimestamp(long clusterTimestamp);
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(ApplicationId other) {
|
||||||
|
if (this.getClusterTimestamp() - other.getClusterTimestamp() == 0) {
|
||||||
|
return this.getId() - other.getId();
|
||||||
|
} else {
|
||||||
|
return this.getClusterTimestamp() > other.getClusterTimestamp() ? 1 :
|
||||||
|
this.getClusterTimestamp() < other.getClusterTimestamp() ? -1 : 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "application_" + this.getClusterTimestamp() + "_" + this.getId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
// Generated by eclipse.
|
||||||
|
final int prime = 31;
|
||||||
|
int result = 1;
|
||||||
|
long clusterTimestamp = getClusterTimestamp();
|
||||||
|
result = prime * result
|
||||||
|
+ (int) (clusterTimestamp ^ (clusterTimestamp >>> 32));
|
||||||
|
result = prime * result + getId();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other == null) return false;
|
||||||
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
|
ApplicationId otherAppId = (ApplicationId)other;
|
||||||
|
if (this.getClusterTimestamp() == otherAppId.getClusterTimestamp() &&
|
||||||
|
this.getId() == otherAppId.getId()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records;
|
package org.apache.hadoop.yarn.api.records;
|
||||||
|
|
||||||
public interface ContainerId extends Comparable<ContainerId>{
|
import java.text.NumberFormat;
|
||||||
|
|
||||||
|
public abstract class ContainerId implements Comparable<ContainerId>{
|
||||||
public abstract ApplicationAttemptId getAppAttemptId();
|
public abstract ApplicationAttemptId getAppAttemptId();
|
||||||
public abstract ApplicationId getAppId();
|
public abstract ApplicationId getAppId();
|
||||||
public abstract int getId();
|
public abstract int getId();
|
||||||
|
@ -26,5 +28,89 @@ public interface ContainerId extends Comparable<ContainerId>{
|
||||||
public abstract void setAppAttemptId(ApplicationAttemptId atId);
|
public abstract void setAppAttemptId(ApplicationAttemptId atId);
|
||||||
public abstract void setAppId(ApplicationId appID);
|
public abstract void setAppId(ApplicationId appID);
|
||||||
public abstract void setId(int id);
|
public abstract void setId(int id);
|
||||||
|
|
||||||
|
|
||||||
|
// TODO: Why thread local?
|
||||||
|
// ^ NumberFormat instances are not threadsafe
|
||||||
|
private static final ThreadLocal<NumberFormat> appIdFormat =
|
||||||
|
new ThreadLocal<NumberFormat>() {
|
||||||
|
@Override
|
||||||
|
public NumberFormat initialValue() {
|
||||||
|
NumberFormat fmt = NumberFormat.getInstance();
|
||||||
|
fmt.setGroupingUsed(false);
|
||||||
|
fmt.setMinimumIntegerDigits(4);
|
||||||
|
return fmt;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: fail the app submission if attempts are more than 10 or something
|
||||||
|
private static final ThreadLocal<NumberFormat> appAttemptIdFormat =
|
||||||
|
new ThreadLocal<NumberFormat>() {
|
||||||
|
@Override
|
||||||
|
public NumberFormat initialValue() {
|
||||||
|
NumberFormat fmt = NumberFormat.getInstance();
|
||||||
|
fmt.setGroupingUsed(false);
|
||||||
|
fmt.setMinimumIntegerDigits(2);
|
||||||
|
return fmt;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// TODO: Why thread local?
|
||||||
|
// ^ NumberFormat instances are not threadsafe
|
||||||
|
private static final ThreadLocal<NumberFormat> containerIdFormat =
|
||||||
|
new ThreadLocal<NumberFormat>() {
|
||||||
|
@Override
|
||||||
|
public NumberFormat initialValue() {
|
||||||
|
NumberFormat fmt = NumberFormat.getInstance();
|
||||||
|
fmt.setGroupingUsed(false);
|
||||||
|
fmt.setMinimumIntegerDigits(6);
|
||||||
|
return fmt;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
// Generated by eclipse.
|
||||||
|
final int prime = 31;
|
||||||
|
int result = 1;
|
||||||
|
result = prime * result + getId();
|
||||||
|
result = prime * result
|
||||||
|
+ ((getAppAttemptId() == null) ? 0 : getAppAttemptId().hashCode());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
|
ContainerId otherCId = (ContainerId)other;
|
||||||
|
if (this.getAppAttemptId().equals(otherCId.getAppAttemptId())) {
|
||||||
|
return this.getId() == otherCId.getId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(ContainerId other) {
|
||||||
|
if (this.getAppAttemptId().compareTo(other.getAppAttemptId()) == 0) {
|
||||||
|
return this.getId() - other.getId();
|
||||||
|
} else {
|
||||||
|
return this.getAppAttemptId().compareTo(other.getAppAttemptId());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
ApplicationId appId = getAppId();
|
||||||
|
sb.append("container_").append(appId.getClusterTimestamp()).append("_");
|
||||||
|
sb.append(appIdFormat.get().format(appId.getId())).append("_");
|
||||||
|
sb.append(appAttemptIdFormat.get().format(getAppAttemptId().
|
||||||
|
getAttemptId())).append("_");
|
||||||
|
sb.append(containerIdFormat.get().format(getId()));
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,35 +18,19 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||||
|
|
||||||
|
|
||||||
import java.text.NumberFormat;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
|
|
||||||
public class ApplicationAttemptIdPBImpl extends ProtoBase<ApplicationAttemptIdProto> implements ApplicationAttemptId {
|
public class ApplicationAttemptIdPBImpl extends ApplicationAttemptId {
|
||||||
ApplicationAttemptIdProto proto = ApplicationAttemptIdProto.getDefaultInstance();
|
ApplicationAttemptIdProto proto = ApplicationAttemptIdProto.getDefaultInstance();
|
||||||
ApplicationAttemptIdProto.Builder builder = null;
|
ApplicationAttemptIdProto.Builder builder = null;
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
|
||||||
private ApplicationId applicationId = null;
|
private ApplicationId applicationId = null;
|
||||||
protected static final NumberFormat idFormat = NumberFormat.getInstance();
|
|
||||||
static {
|
|
||||||
idFormat.setGroupingUsed(false);
|
|
||||||
idFormat.setMinimumIntegerDigits(4);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static final NumberFormat counterFormat = NumberFormat.getInstance();
|
|
||||||
static {
|
|
||||||
counterFormat.setGroupingUsed(false);
|
|
||||||
counterFormat.setMinimumIntegerDigits(6);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public ApplicationAttemptIdPBImpl() {
|
public ApplicationAttemptIdPBImpl() {
|
||||||
builder = ApplicationAttemptIdProto.newBuilder();
|
builder = ApplicationAttemptIdProto.newBuilder();
|
||||||
}
|
}
|
||||||
|
@ -117,44 +101,11 @@ public class ApplicationAttemptIdPBImpl extends ProtoBase<ApplicationAttemptIdPr
|
||||||
this.applicationId = appId;
|
this.applicationId = appId;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
||||||
return new ApplicationIdPBImpl(p);
|
return new ApplicationIdPBImpl(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
||||||
return ((ApplicationIdPBImpl)t).getProto();
|
return ((ApplicationIdPBImpl)t).getProto();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized int hashCode() {
|
|
||||||
return getProto().hashCode();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized boolean equals(Object other) {
|
|
||||||
if (other == null) return false;
|
|
||||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
|
||||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized int compareTo(ApplicationAttemptId other) {
|
|
||||||
int compareAppIds = this.getApplicationId().compareTo(
|
|
||||||
other.getApplicationId());
|
|
||||||
if (compareAppIds == 0) {
|
|
||||||
return this.getAttemptId() - other.getAttemptId();
|
|
||||||
} else {
|
|
||||||
return compareAppIds;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized String toString() {
|
|
||||||
String id = (this.getApplicationId() != null) ? this.getApplicationId().getClusterTimestamp() + "_" +
|
|
||||||
idFormat.format(this.getApplicationId().getId()): "none";
|
|
||||||
return "appattempt_" + id + "_" + counterFormat.format(getAttemptId());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,13 +20,12 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProtoOrBuilder;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public class ApplicationIdPBImpl extends ProtoBase<ApplicationIdProto> implements ApplicationId {
|
public class ApplicationIdPBImpl extends ApplicationId {
|
||||||
ApplicationIdProto proto = ApplicationIdProto.getDefaultInstance();
|
ApplicationIdProto proto = ApplicationIdProto.getDefaultInstance();
|
||||||
ApplicationIdProto.Builder builder = null;
|
ApplicationIdProto.Builder builder = null;
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
@ -40,13 +39,13 @@ public class ApplicationIdPBImpl extends ProtoBase<ApplicationIdProto> implement
|
||||||
viaProto = true;
|
viaProto = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ApplicationIdProto getProto() {
|
public synchronized ApplicationIdProto getProto() {
|
||||||
proto = viaProto ? proto : builder.build();
|
proto = viaProto ? proto : builder.build();
|
||||||
viaProto = true;
|
viaProto = true;
|
||||||
return proto;
|
return proto;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void maybeInitBuilder() {
|
private synchronized void maybeInitBuilder() {
|
||||||
if (viaProto || builder == null) {
|
if (viaProto || builder == null) {
|
||||||
builder = ApplicationIdProto.newBuilder(proto);
|
builder = ApplicationIdProto.newBuilder(proto);
|
||||||
}
|
}
|
||||||
|
@ -55,40 +54,25 @@ public class ApplicationIdPBImpl extends ProtoBase<ApplicationIdProto> implement
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getId() {
|
public synchronized int getId() {
|
||||||
ApplicationIdProtoOrBuilder p = viaProto ? proto : builder;
|
ApplicationIdProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
return (p.getId());
|
return (p.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setId(int id) {
|
public synchronized void setId(int id) {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
builder.setId((id));
|
builder.setId((id));
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public long getClusterTimestamp() {
|
public synchronized long getClusterTimestamp() {
|
||||||
ApplicationIdProtoOrBuilder p = viaProto ? proto : builder;
|
ApplicationIdProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
return (p.getClusterTimestamp());
|
return (p.getClusterTimestamp());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setClusterTimestamp(long clusterTimestamp) {
|
public synchronized void setClusterTimestamp(long clusterTimestamp) {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
builder.setClusterTimestamp((clusterTimestamp));
|
builder.setClusterTimestamp((clusterTimestamp));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
@Override
|
|
||||||
public int compareTo(ApplicationId other) {
|
|
||||||
if (this.getId() - other.getId() == 0) {
|
|
||||||
return this.getClusterTimestamp() > other.getClusterTimestamp() ? 1 :
|
|
||||||
this.getClusterTimestamp() < other.getClusterTimestamp() ? -1 : 0;
|
|
||||||
} else {
|
|
||||||
return this.getId() - other.getId();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "application_" + this.getClusterTimestamp() + "_" + this.getId();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -18,72 +18,23 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||||
|
|
||||||
|
|
||||||
import java.text.NumberFormat;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProtoOrBuilder;
|
||||||
|
|
||||||
|
|
||||||
public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements ContainerId {
|
public class ContainerIdPBImpl extends ContainerId {
|
||||||
ContainerIdProto proto = ContainerIdProto.getDefaultInstance();
|
ContainerIdProto proto = ContainerIdProto.getDefaultInstance();
|
||||||
ContainerIdProto.Builder builder = null;
|
ContainerIdProto.Builder builder = null;
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
|
||||||
private ApplicationId applicationId = null;
|
private ApplicationId applicationId = null;
|
||||||
private ApplicationAttemptId appAttemptId = null;
|
private ApplicationAttemptId appAttemptId = null;
|
||||||
protected static final NumberFormat idFormat = NumberFormat.getInstance();
|
|
||||||
static {
|
|
||||||
idFormat.setGroupingUsed(false);
|
|
||||||
idFormat.setMinimumIntegerDigits(4);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static final NumberFormat counterFormat = NumberFormat.getInstance();
|
|
||||||
static {
|
|
||||||
counterFormat.setGroupingUsed(false);
|
|
||||||
counterFormat.setMinimumIntegerDigits(6);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Why thread local?
|
|
||||||
// ^ NumberFormat instances are not threadsafe
|
|
||||||
private static final ThreadLocal<NumberFormat> appIdFormat = new ThreadLocal<NumberFormat>() {
|
|
||||||
@Override
|
|
||||||
public NumberFormat initialValue() {
|
|
||||||
NumberFormat fmt = NumberFormat.getInstance();
|
|
||||||
fmt.setGroupingUsed(false);
|
|
||||||
fmt.setMinimumIntegerDigits(4);
|
|
||||||
return fmt;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: fail the app submission if attempts are more than 10 or something
|
|
||||||
private static final ThreadLocal<NumberFormat> appAttemptIdFormat = new ThreadLocal<NumberFormat>() {
|
|
||||||
@Override
|
|
||||||
public NumberFormat initialValue() {
|
|
||||||
NumberFormat fmt = NumberFormat.getInstance();
|
|
||||||
fmt.setGroupingUsed(false);
|
|
||||||
fmt.setMinimumIntegerDigits(2);
|
|
||||||
return fmt;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// TODO: Why thread local?
|
|
||||||
// ^ NumberFormat instances are not threadsafe
|
|
||||||
private static final ThreadLocal<NumberFormat> containerIdFormat = new ThreadLocal<NumberFormat>() {
|
|
||||||
@Override
|
|
||||||
public NumberFormat initialValue() {
|
|
||||||
NumberFormat fmt = NumberFormat.getInstance();
|
|
||||||
fmt.setGroupingUsed(false);
|
|
||||||
fmt.setMinimumIntegerDigits(6);
|
|
||||||
return fmt;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
public ContainerIdPBImpl() {
|
public ContainerIdPBImpl() {
|
||||||
builder = ContainerIdProto.newBuilder();
|
builder = ContainerIdProto.newBuilder();
|
||||||
}
|
}
|
||||||
|
@ -93,14 +44,14 @@ public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements Co
|
||||||
viaProto = true;
|
viaProto = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContainerIdProto getProto() {
|
public synchronized ContainerIdProto getProto() {
|
||||||
mergeLocalToProto();
|
mergeLocalToProto();
|
||||||
proto = viaProto ? proto : builder.build();
|
proto = viaProto ? proto : builder.build();
|
||||||
viaProto = true;
|
viaProto = true;
|
||||||
return proto;
|
return proto;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mergeLocalToBuilder() {
|
private synchronized void mergeLocalToBuilder() {
|
||||||
if (this.applicationId != null && !((ApplicationIdPBImpl)applicationId).getProto().equals(builder.getAppId())) {
|
if (this.applicationId != null && !((ApplicationIdPBImpl)applicationId).getProto().equals(builder.getAppId())) {
|
||||||
builder.setAppId(convertToProtoFormat(this.applicationId));
|
builder.setAppId(convertToProtoFormat(this.applicationId));
|
||||||
}
|
}
|
||||||
|
@ -109,7 +60,7 @@ public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements Co
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mergeLocalToProto() {
|
private synchronized void mergeLocalToProto() {
|
||||||
if (viaProto)
|
if (viaProto)
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
mergeLocalToBuilder();
|
mergeLocalToBuilder();
|
||||||
|
@ -117,7 +68,7 @@ public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements Co
|
||||||
viaProto = true;
|
viaProto = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void maybeInitBuilder() {
|
private synchronized void maybeInitBuilder() {
|
||||||
if (viaProto || builder == null) {
|
if (viaProto || builder == null) {
|
||||||
builder = ContainerIdProto.newBuilder(proto);
|
builder = ContainerIdProto.newBuilder(proto);
|
||||||
}
|
}
|
||||||
|
@ -126,18 +77,18 @@ public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements Co
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getId() {
|
public synchronized int getId() {
|
||||||
ContainerIdProtoOrBuilder p = viaProto ? proto : builder;
|
ContainerIdProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
return (p.getId());
|
return (p.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setId(int id) {
|
public synchronized void setId(int id) {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
builder.setId((id));
|
builder.setId((id));
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public ApplicationId getAppId() {
|
public synchronized ApplicationId getAppId() {
|
||||||
ContainerIdProtoOrBuilder p = viaProto ? proto : builder;
|
ContainerIdProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
if (this.applicationId != null) {
|
if (this.applicationId != null) {
|
||||||
return this.applicationId;
|
return this.applicationId;
|
||||||
|
@ -150,7 +101,7 @@ public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements Co
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ApplicationAttemptId getAppAttemptId() {
|
public synchronized ApplicationAttemptId getAppAttemptId() {
|
||||||
ContainerIdProtoOrBuilder p = viaProto ? proto : builder;
|
ContainerIdProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
if (this.appAttemptId != null) {
|
if (this.appAttemptId != null) {
|
||||||
return this.appAttemptId;
|
return this.appAttemptId;
|
||||||
|
@ -163,7 +114,7 @@ public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements Co
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setAppId(ApplicationId appId) {
|
public synchronized void setAppId(ApplicationId appId) {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
if (appId == null)
|
if (appId == null)
|
||||||
builder.clearAppId();
|
builder.clearAppId();
|
||||||
|
@ -171,7 +122,7 @@ public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements Co
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setAppAttemptId(ApplicationAttemptId atId) {
|
public synchronized void setAppAttemptId(ApplicationAttemptId atId) {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
if (atId == null)
|
if (atId == null)
|
||||||
builder.clearAppAttemptId();
|
builder.clearAppAttemptId();
|
||||||
|
@ -193,42 +144,4 @@ public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements Co
|
||||||
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
||||||
return ((ApplicationIdPBImpl)t).getProto();
|
return ((ApplicationIdPBImpl)t).getProto();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return getProto().hashCode();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object other) {
|
|
||||||
if (other == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
|
||||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareTo(ContainerId other) {
|
|
||||||
if (this.getAppAttemptId().compareTo(other.getAppAttemptId()) == 0) {
|
|
||||||
return this.getId() - other.getId();
|
|
||||||
} else {
|
|
||||||
return this.getAppAttemptId().compareTo(other.getAppAttemptId());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
ApplicationId appId = getAppId();
|
|
||||||
sb.append("container_").append(appId.getClusterTimestamp()).append("_");
|
|
||||||
sb.append(appIdFormat.get().format(appId.getId())).append("_");
|
|
||||||
sb.append(appAttemptIdFormat.get().format(getAppAttemptId().
|
|
||||||
getAttemptId())).append("_");
|
|
||||||
sb.append(containerIdFormat.get().format(getId()));
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
package org.apache.hadoop.yarn.api;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestApplicationAttemptId {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationAttemptId() {
|
||||||
|
ApplicationAttemptId a1 = createAppAttemptId(10l, 1, 1);
|
||||||
|
ApplicationAttemptId a2 = createAppAttemptId(10l, 1, 2);
|
||||||
|
ApplicationAttemptId a3 = createAppAttemptId(10l, 2, 1);
|
||||||
|
ApplicationAttemptId a4 = createAppAttemptId(8l, 1, 4);
|
||||||
|
ApplicationAttemptId a5 = createAppAttemptId(10l, 1, 1);
|
||||||
|
|
||||||
|
Assert.assertTrue(a1.equals(a5));
|
||||||
|
Assert.assertFalse(a1.equals(a2));
|
||||||
|
Assert.assertFalse(a1.equals(a3));
|
||||||
|
Assert.assertFalse(a1.equals(a4));
|
||||||
|
|
||||||
|
Assert.assertTrue(a1.compareTo(a5) == 0);
|
||||||
|
Assert.assertTrue(a1.compareTo(a2) < 0);
|
||||||
|
Assert.assertTrue(a1.compareTo(a3) < 0);
|
||||||
|
Assert.assertTrue(a1.compareTo(a4) > 0);
|
||||||
|
|
||||||
|
Assert.assertTrue(a1.hashCode() == a5.hashCode());
|
||||||
|
Assert.assertFalse(a1.hashCode() == a2.hashCode());
|
||||||
|
Assert.assertFalse(a1.hashCode() == a3.hashCode());
|
||||||
|
Assert.assertFalse(a1.hashCode() == a4.hashCode());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationAttemptId createAppAttemptId(long clusterTimeStamp,
|
||||||
|
int id, int attemptId) {
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
Records.newRecord(ApplicationAttemptId.class);
|
||||||
|
ApplicationId appId = Records.newRecord(ApplicationId.class);
|
||||||
|
appId.setClusterTimestamp(clusterTimeStamp);
|
||||||
|
appId.setId(id);
|
||||||
|
appAttemptId.setApplicationId(appId);
|
||||||
|
appAttemptId.setAttemptId(attemptId);
|
||||||
|
return appAttemptId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package org.apache.hadoop.yarn.api;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestApplicationId {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationId() {
|
||||||
|
ApplicationId a1 = createAppId(10l, 1);
|
||||||
|
ApplicationId a2 = createAppId(10l, 2);
|
||||||
|
ApplicationId a3 = createAppId(10l, 1);
|
||||||
|
ApplicationId a4 = createAppId(8l, 3);
|
||||||
|
|
||||||
|
Assert.assertFalse(a1.equals(a2));
|
||||||
|
Assert.assertFalse(a1.equals(a4));
|
||||||
|
Assert.assertTrue(a1.equals(a3));
|
||||||
|
|
||||||
|
Assert.assertTrue(a1.compareTo(a2) < 0);
|
||||||
|
Assert.assertTrue(a1.compareTo(a3) == 0);
|
||||||
|
Assert.assertTrue(a1.compareTo(a4) > 0);
|
||||||
|
|
||||||
|
Assert.assertTrue(a1.hashCode() == a3.hashCode());
|
||||||
|
Assert.assertFalse(a1.hashCode() == a2.hashCode());
|
||||||
|
Assert.assertFalse(a2.hashCode() == a4.hashCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationId createAppId(long clusterTimeStamp, int id) {
|
||||||
|
ApplicationId appId = Records.newRecord(ApplicationId.class);
|
||||||
|
appId.setClusterTimestamp(clusterTimeStamp);
|
||||||
|
appId.setId(id);
|
||||||
|
return appId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,64 @@
|
||||||
|
package org.apache.hadoop.yarn.api;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestContainerId {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerId() {
|
||||||
|
ContainerId c1 = createContainerId(10l, 1, 1, 1);
|
||||||
|
ContainerId c2 = createContainerId(10l, 1, 1, 2);
|
||||||
|
ContainerId c3 = createContainerId(10l, 1, 1, 1);
|
||||||
|
ContainerId c4 = createContainerId(10l, 1, 3, 1);
|
||||||
|
ContainerId c5 = createContainerId(8l, 1, 3, 1);
|
||||||
|
|
||||||
|
Assert.assertTrue(c1.equals(c3));
|
||||||
|
Assert.assertFalse(c1.equals(c2));
|
||||||
|
Assert.assertFalse(c1.equals(c4));
|
||||||
|
Assert.assertFalse(c1.equals(c5));
|
||||||
|
|
||||||
|
Assert.assertTrue(c1.compareTo(c3) == 0);
|
||||||
|
Assert.assertTrue(c1.compareTo(c2) < 0);
|
||||||
|
Assert.assertTrue(c1.compareTo(c4) < 0);
|
||||||
|
Assert.assertTrue(c1.compareTo(c5) > 0);
|
||||||
|
|
||||||
|
Assert.assertTrue(c1.hashCode() == c3.hashCode());
|
||||||
|
Assert.assertFalse(c1.hashCode() == c2.hashCode());
|
||||||
|
Assert.assertFalse(c1.hashCode() == c4.hashCode());
|
||||||
|
Assert.assertFalse(c1.hashCode() == c5.hashCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ContainerId createContainerId(long clusterTimestamp, int appIdInt,
|
||||||
|
int appAttemptIdInt, int containerIdInt) {
|
||||||
|
ApplicationId appId = createAppId(clusterTimestamp, appIdInt);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
createAppAttemptId(appId, appAttemptIdInt);
|
||||||
|
ContainerId containerId = Records.newRecord(ContainerId.class);
|
||||||
|
containerId.setAppAttemptId(appAttemptId);
|
||||||
|
containerId.setAppId(appId);
|
||||||
|
containerId.setId(containerIdInt);
|
||||||
|
return containerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationId createAppId(long clusterTimeStamp, int id) {
|
||||||
|
ApplicationId appId = Records.newRecord(ApplicationId.class);
|
||||||
|
appId.setClusterTimestamp(clusterTimeStamp);
|
||||||
|
appId.setId(id);
|
||||||
|
return appId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationAttemptId createAppAttemptId(ApplicationId appId,
|
||||||
|
int attemptId) {
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
Records.newRecord(ApplicationAttemptId.class);
|
||||||
|
appAttemptId.setApplicationId(appId);
|
||||||
|
appAttemptId.setAttemptId(attemptId);
|
||||||
|
return appAttemptId;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue