merge -r 1404176:1404177 from trunk. FIXES: MAPREDUCE-4752

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1404182 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-10-31 15:02:59 +00:00
parent c092ee4c4c
commit b958fa85a5
23 changed files with 79 additions and 44 deletions

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
@ -1964,13 +1965,16 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
continue;
Element field = (Element)fieldNode;
if ("name".equals(field.getTagName()) && field.hasChildNodes())
attr = ((Text)field.getFirstChild()).getData().trim();
attr = StringInterner.weakIntern(
((Text)field.getFirstChild()).getData().trim());
if ("value".equals(field.getTagName()) && field.hasChildNodes())
value = ((Text)field.getFirstChild()).getData();
value = StringInterner.weakIntern(
((Text)field.getFirstChild()).getData());
if ("final".equals(field.getTagName()) && field.hasChildNodes())
finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
if ("source".equals(field.getTagName()) && field.hasChildNodes())
source.add(((Text)field.getFirstChild()).getData());
source.add(StringInterner.weakIntern(
((Text)field.getFirstChild()).getData()));
}
source.add(name);

View File

@ -59,6 +59,9 @@ public class StringInterner {
* @return strong reference to interned string instance
*/
public static String strongIntern(String sample) {
if (sample == null) {
return null;
}
return strongInterner.intern(sample);
}
@ -72,6 +75,9 @@ public class StringInterner {
* @return weak reference to interned string instance
*/
public static String weakIntern(String sample) {
if (sample == null) {
return null;
}
return weakInterner.intern(sample);
}

View File

@ -434,6 +434,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol
for Job, Task and TaskAttempt. (Siddarth Seth via vinodkv)
MAPREDUCE-4752. Reduce MR AM memory usage through String Interning (Robert
Evans via tgraves)
OPTIMIZATIONS
BUG FIXES

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.CompositeService;
@ -280,6 +281,7 @@ public class TaskAttemptListenerImpl extends CompositeService
@Override
public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
throws IOException {
diagnosticInfo = StringInterner.weakIntern(diagnosticInfo);
LOG.info("Diagnostics report from " + taskAttemptID.toString() + ": "
+ diagnosticInfo);

View File

@ -105,6 +105,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
@ -940,7 +941,6 @@ public abstract class TaskAttemptImpl implements
Counters counters = reportedStatus.counters;
if (counters == null) {
counters = EMPTY_COUNTERS;
// counters.groups = new HashMap<String, CounterGroup>();
}
return counters;
} finally {
@ -1262,9 +1262,10 @@ public abstract class TaskAttemptImpl implements
(TaskAttemptContainerAssignedEvent) event;
taskAttempt.containerID = cEvent.getContainer().getId();
taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
taskAttempt.containerMgrAddress = taskAttempt.containerNodeId
.toString();
taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress();
taskAttempt.containerMgrAddress = StringInterner.weakIntern(
taskAttempt.containerNodeId.toString());
taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
cEvent.getContainer().getNodeHttpAddress());
taskAttempt.nodeRackName = RackResolver.resolve(
taskAttempt.containerNodeId.getHost()).getNetworkLocation();
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
@ -1710,7 +1711,6 @@ public abstract class TaskAttemptImpl implements
result.stateString = "NEW";
result.taskState = TaskAttemptState.NEW;
Counters counters = EMPTY_COUNTERS;
// counters.groups = new HashMap<String, CounterGroup>();
result.counters = counters;
}

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -667,9 +668,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
String scheme = (encryptedShuffle) ? "https://" : "http://";
tce.setMapOutputServerAddress(scheme
tce.setMapOutputServerAddress(StringInterner.weakIntern(scheme
+ attempt.getNodeHttpAddress().split(":")[0] + ":"
+ attempt.getShufflePort());
+ attempt.getShufflePort()));
tce.setStatus(status);
tce.setAttemptId(attempt.getID());
int runTime = 0;

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
@ -620,7 +621,7 @@ public class RMContainerAllocator extends RMContainerRequestor
eventHandler.handle(new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
// Send the diagnostics
String diagnostics = cont.getDiagnostics();
String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
diagnostics));
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.util.StringInterner;
/**
* Status information on the current state of the Map-Reduce cluster.
@ -141,9 +142,9 @@ public class ClusterStatus implements Writable {
@Override
public void readFields(DataInput in) throws IOException {
trackerName = Text.readString(in);
reasonForBlackListing = Text.readString(in);
blackListReport = Text.readString(in);
trackerName = StringInterner.weakIntern(Text.readString(in));
reasonForBlackListing = StringInterner.weakIntern(Text.readString(in));
blackListReport = StringInterner.weakIntern(Text.readString(in));
}
@Override
@ -429,7 +430,7 @@ public class ClusterStatus implements Writable {
int numTrackerNames = in.readInt();
if (numTrackerNames > 0) {
for (int i = 0; i < numTrackerNames; i++) {
String name = Text.readString(in);
String name = StringInterner.weakIntern(Text.readString(in));
activeTrackers.add(name);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.util.StringInterner;
/**************************************************
* A JobProfile is a MapReduce primitive. Tracks a job,
@ -176,11 +177,11 @@ public class JobProfile implements Writable {
public void readFields(DataInput in) throws IOException {
jobid.readFields(in);
this.jobFile = Text.readString(in);
this.url = Text.readString(in);
this.user = Text.readString(in);
this.name = Text.readString(in);
this.queueName = Text.readString(in);
this.jobFile = StringInterner.weakIntern(Text.readString(in));
this.url = StringInterner.weakIntern(Text.readString(in));
this.user = StringInterner.weakIntern(Text.readString(in));
this.name = StringInterner.weakIntern(Text.readString(in));
this.queueName = StringInterner.weakIntern(Text.readString(in));
}
}

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
/** A Map task. */
@ -343,7 +344,7 @@ class MapTask extends Task {
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream inFile = fs.open(file);
inFile.seek(offset);
String className = Text.readString(inFile);
String className = StringInterner.weakIntern(Text.readString(inFile));
Class<T> cls;
try {
cls = (Class<T>) conf.getClassByName(className);

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
/**
@ -467,7 +468,7 @@ abstract public class Task implements Writable, Configurable {
}
public void readFields(DataInput in) throws IOException {
jobFile = Text.readString(in);
jobFile = StringInterner.weakIntern(Text.readString(in));
taskId = TaskAttemptID.read(in);
partition = in.readInt();
numSlotsRequired = in.readInt();
@ -487,7 +488,7 @@ abstract public class Task implements Writable, Configurable {
if (taskCleanup) {
setPhase(TaskStatus.Phase.CLEANUP);
}
user = Text.readString(in);
user = StringInterner.weakIntern(Text.readString(in));
extraData.readFields(in);
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
/**************************************************
* Describes the current status of a task. This is
@ -477,8 +478,8 @@ public abstract class TaskStatus implements Writable, Cloneable {
setProgress(in.readFloat());
this.numSlots = in.readInt();
this.runState = WritableUtils.readEnum(in, State.class);
setDiagnosticInfo(Text.readString(in));
setStateString(Text.readString(in));
setDiagnosticInfo(StringInterner.weakIntern(Text.readString(in)));
setStateString(StringInterner.weakIntern(Text.readString(in)));
this.phase = WritableUtils.readEnum(in, Phase.class);
this.startTime = in.readLong();
this.finishTime = in.readLong();

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
/**
* An {@link InputSplit} that tags another InputSplit with extra data for use
@ -114,7 +115,7 @@ class TaggedInputSplit implements Configurable, InputSplit {
}
private Class<?> readClass(DataInput in) throws IOException {
String className = Text.readString(in);
String className = StringInterner.weakIntern(Text.readString(in));
try {
return conf.getClassByName(className);
} catch (ClassNotFoundException e) {

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringInterner;
/**************************************************
* Describes the current status of a job.
@ -456,15 +457,15 @@ public class JobStatus implements Writable, Cloneable {
this.cleanupProgress = in.readFloat();
this.runState = WritableUtils.readEnum(in, State.class);
this.startTime = in.readLong();
this.user = Text.readString(in);
this.user = StringInterner.weakIntern(Text.readString(in));
this.priority = WritableUtils.readEnum(in, JobPriority.class);
this.schedulingInfo = Text.readString(in);
this.schedulingInfo = StringInterner.weakIntern(Text.readString(in));
this.finishTime = in.readLong();
this.isRetired = in.readBoolean();
this.historyFile = Text.readString(in);
this.jobName = Text.readString(in);
this.trackingUrl = Text.readString(in);
this.jobFile = Text.readString(in);
this.historyFile = StringInterner.weakIntern(Text.readString(in));
this.jobName = StringInterner.weakIntern(Text.readString(in));
this.trackingUrl = StringInterner.weakIntern(Text.readString(in));
this.jobFile = StringInterner.weakIntern(Text.readString(in));
this.isUber = in.readBoolean();
// De-serialize the job's ACLs

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringInterner;
/**
* Class to encapsulate Queue ACLs for a particular
@ -82,7 +83,7 @@ public class QueueAclsInfo implements Writable {
@Override
public void readFields(DataInput in) throws IOException {
queueName = Text.readString(in);
queueName = StringInterner.weakIntern(Text.readString(in));
operations = WritableUtils.readStringArray(in);
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringInterner;
/**
* Class that contains the information regarding the Job Queues which are
@ -190,9 +191,9 @@ public class QueueInfo implements Writable {
@Override
public void readFields(DataInput in) throws IOException {
queueName = Text.readString(in);
queueName = StringInterner.weakIntern(Text.readString(in));
queueState = WritableUtils.readEnum(in, QueueState.class);
schedulingInfo = Text.readString(in);
schedulingInfo = StringInterner.weakIntern(Text.readString(in));
int length = in.readInt();
stats = new JobStatus[length];
for (int i = 0; i < length; i++) {

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.TIPStatus;
import org.apache.hadoop.util.StringInterner;
/** A report on the state of a task. */
@InterfaceAudience.Private
@ -208,7 +209,7 @@ public class TaskReport implements Writable {
public void readFields(DataInput in) throws IOException {
this.taskid.readFields(in);
this.progress = in.readFloat();
this.state = Text.readString(in);
this.state = StringInterner.weakIntern(Text.readString(in));
this.startTime = in.readLong();
this.finishTime = in.readLong();

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.util.ResourceBundles;
import org.apache.hadoop.util.StringInterner;
import com.google.common.collect.Iterators;
@ -164,7 +165,7 @@ public abstract class AbstractCounterGroup<T extends Counter>
@Override
public synchronized void readFields(DataInput in) throws IOException {
displayName = Text.readString(in);
displayName = StringInterner.weakIntern(Text.readString(in));
counters.clear();
int size = WritableUtils.readVInt(in);
for (int i = 0; i < size; i++) {

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.util.StringInterner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
@ -308,7 +309,8 @@ public abstract class AbstractCounters<C extends Counter,
int numGroups = WritableUtils.readVInt(in);
while (numGroups-- > 0) {
limits.checkGroups(groups.size() + 1);
G group = groupFactory.newGenericGroup(Text.readString(in), null, limits);
G group = groupFactory.newGenericGroup(
StringInterner.weakIntern(Text.readString(in)), null, limits);
group.readFields(in);
groups.put(group.getName(), group);
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.StringInterner;
/**
* A generic counter implementation
@ -59,8 +60,9 @@ public class GenericCounter extends AbstractCounter {
@Override
public synchronized void readFields(DataInput in) throws IOException {
name = Text.readString(in);
displayName = in.readBoolean() ? Text.readString(in) : name;
name = StringInterner.weakIntern(Text.readString(in));
displayName = in.readBoolean() ?
StringInterner.weakIntern(Text.readString(in)) : name;
value = WritableUtils.readVLong(in);
}

View File

@ -269,7 +269,7 @@ public class JobHistoryParser {
TaskAttemptInfo attemptInfo =
taskInfo.attemptsMap.get(event.getTaskAttemptId());
attemptInfo.finishTime = event.getFinishTime();
attemptInfo.error = event.getError();
attemptInfo.error = StringInterner.weakIntern(event.getError());
attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
attemptInfo.port = event.getPort();
@ -326,7 +326,7 @@ public class JobHistoryParser {
TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
taskInfo.status = TaskStatus.State.FAILED.toString();
taskInfo.finishTime = event.getFinishTime();
taskInfo.error = event.getError();
taskInfo.error = StringInterner.weakIntern(event.getError());
taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
info.errorInfo = "Task " + taskInfo.taskId +" failed " +
taskInfo.attemptsMap.size() + " times ";

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
/**
* An {@link InputSplit} that tags another InputSplit with extra data for use
@ -128,7 +129,7 @@ class TaggedInputSplit extends InputSplit implements Configurable, Writable {
}
private Class<?> readClass(DataInput in) throws IOException {
String className = Text.readString(in);
String className = StringInterner.weakIntern(Text.readString(in));
try {
return conf.getClassByName(className);
} catch (ClassNotFoundException e) {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.util;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -109,6 +110,7 @@ public class Apps {
} else {
val = val + SYSTEM_PATH_SEPARATOR + value;
}
environment.put(variable, val);
environment.put(StringInterner.weakIntern(variable),
StringInterner.weakIntern(val));
}
}