MAPREDUCE-3567. Extraneous JobConf objects in AM heap. Contributed by Vinod Kumar Vavilapalli)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1222498 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2011-12-22 22:34:40 +00:00
parent ef9b974879
commit 8fa0a3c737
13 changed files with 50 additions and 46 deletions

View File

@ -169,6 +169,9 @@ Release 0.23.1 - Unreleased
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
Vavilapalli via sseth)
BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob

View File

@ -29,9 +29,9 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.mapreduce.JobContext;
@ -41,11 +41,11 @@ import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -35,13 +34,14 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class MapTaskAttemptImpl extends TaskAttemptImpl {
private final TaskSplitMetaInfo splitInfo;
public MapTaskAttemptImpl(TaskId taskId, int attempt,
EventHandler eventHandler, Path jobFile,
int partition, TaskSplitMetaInfo splitInfo, Configuration conf,
int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -34,14 +33,14 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class ReduceTaskAttemptImpl extends TaskAttemptImpl {
private final int numMapTasks;
public ReduceTaskAttemptImpl(TaskId id, int attempt,
EventHandler eventHandler, Path jobFile, int partition,
int numMapTasks, Configuration conf,
int numMapTasks, JobConf conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {

View File

@ -54,7 +54,6 @@ import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@ -110,6 +109,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
/** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency.
*/
@SuppressWarnings({ "rawtypes", "deprecation" })
public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
EventHandler<JobEvent> {
@ -154,7 +154,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// Can then replace task-level uber counters (MR-2424) with job-level ones
// sent from LocalContainerLauncher, and eventually including a count of
// of uber-AM attempts (probably sent from MRAppMaster).
public Configuration conf;
public JobConf conf;
//fields initialized in init
private FileSystem fs;
@ -371,7 +371,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
this.applicationAttemptId = applicationAttemptId;
this.jobId = jobId;
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
this.conf = conf;
this.conf = new JobConf(conf);
this.metrics = metrics;
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
@ -979,7 +979,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
new JobConf(job.conf), job.oldJobId);
job.conf, job.oldJobId);
}
long inputLength = 0;

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Collection;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -31,20 +31,20 @@ import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class MapTaskImpl extends TaskImpl {
private final TaskSplitMetaInfo taskSplitMetaInfo;
public MapTaskImpl(JobId jobId, int partition, EventHandler eventHandler,
Path remoteJobConfFile, Configuration conf,
Path remoteJobConfFile, JobConf conf,
TaskSplitMetaInfo taskSplitMetaInfo,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Collection;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -30,19 +30,20 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class ReduceTaskImpl extends TaskImpl {
private final int numMapTasks;
public ReduceTaskImpl(JobId jobId, int partition,
EventHandler eventHandler, Path jobFile, Configuration conf,
EventHandler eventHandler, Path jobFile, JobConf conf,
int numMapTasks, TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,

View File

@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.util.RackResolver;
/**
* Implementation of TaskAttempt interface.
*/
@SuppressWarnings({ "rawtypes", "deprecation" })
public abstract class TaskAttemptImpl implements
org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
EventHandler<TaskAttemptEvent> {
@ -135,10 +136,9 @@ public abstract class TaskAttemptImpl implements
private static final int REDUCE_MEMORY_MB_DEFAULT = 1024;
private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected final Configuration conf;
protected final JobConf conf;
protected final Path jobFile;
protected final int partition;
@SuppressWarnings("rawtypes")
protected final EventHandler eventHandler;
private final TaskAttemptId attemptId;
private final Clock clock;
@ -445,9 +445,9 @@ public abstract class TaskAttemptImpl implements
.getProperty("line.separator");
public TaskAttemptImpl(TaskId taskId, int i,
@SuppressWarnings("rawtypes") EventHandler eventHandler,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
Configuration conf, String[] dataLocalHosts, OutputCommitter committer,
JobConf conf, String[] dataLocalHosts, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
oldJobId = TypeConverter.fromYarn(taskId.getJobId());
@ -1199,7 +1199,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEvent event) {
@SuppressWarnings("deprecation")
TaskAttemptContext taskContext =
new TaskAttemptContextImpl(new JobConf(taskAttempt.conf),
new TaskAttemptContextImpl(taskAttempt.conf,
TypeConverter.fromYarn(taskAttempt.attemptId));
taskAttempt.eventHandler.handle(new TaskCleanupEvent(
taskAttempt.attemptId,

View File

@ -31,8 +31,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -50,8 +50,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@ -66,6 +64,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
@ -81,11 +81,12 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
/**
* Implementation of Task interface.
*/
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final Log LOG = LogFactory.getLog(TaskImpl.class);
protected final Configuration conf;
protected final JobConf conf;
protected final Path jobFile;
protected final OutputCommitter committer;
protected final int partition;
@ -225,7 +226,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
}
public TaskImpl(JobId jobId, TaskType taskType, int partition,
EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,

View File

@ -119,6 +119,10 @@ public class MRApp extends MRAppMaster {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
}
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
}
private static ApplicationAttemptId getApplicationAttemptId(
ApplicationId applicationId, int startCount) {
ApplicationAttemptId applicationAttemptId =

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -169,7 +168,7 @@ public class MRAppBenchmark {
}
public void benchmark1() throws Exception {
int maps = 900;
int maps = 100000;
int reduces = 100;
System.out.println("Running benchmark with maps:"+maps +
" reduces:"+reduces);

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -29,7 +29,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
@ -60,11 +59,12 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class TestTaskImpl {
private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);
private Configuration conf;
private JobConf conf;
private TaskAttemptListener taskAttemptListener;
private OutputCommitter committer;
private Token<JobTokenIdentifier> jobToken;
@ -91,9 +91,8 @@ public class TestTaskImpl {
private int taskAttemptCounter = 0;
@SuppressWarnings("rawtypes")
public MockTaskImpl(JobId jobId, int partition,
EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
@ -132,10 +131,9 @@ public class TestTaskImpl {
private TaskAttemptState state = TaskAttemptState.NEW;
private TaskAttemptId attemptId;
@SuppressWarnings("rawtypes")
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
Configuration conf, OutputCommitter committer,
JobConf conf, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
@ -175,7 +173,6 @@ public class TestTaskImpl {
private class MockTask extends Task {
@Override
@SuppressWarnings("deprecation")
public void run(JobConf job, TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
return;
@ -195,7 +192,7 @@ public class TestTaskImpl {
++startCount;
conf = new Configuration();
conf = new JobConf();
taskAttemptListener = mock(TaskAttemptListener.class);
committer = mock(OutputCommitter.class);
jobToken = (Token<JobTokenIdentifier>) mock(Token.class);

View File

@ -75,7 +75,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
LOG.info("AsyncDispatcher thread interrupted", ie);
LOG.warn("AsyncDispatcher thread interrupted", ie);
return;
}
if (event != null) {
@ -114,8 +114,10 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
@SuppressWarnings("unchecked")
protected void dispatch(Event event) {
//all events go thru this loop
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
@ -131,12 +133,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
}
@SuppressWarnings("unchecked")
@Override
@SuppressWarnings("rawtypes")
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
@SuppressWarnings("unchecked")
EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
@ -170,7 +171,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.info("Very low remaining capacity in the event-queue: "
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
@ -186,7 +187,6 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
* are interested in the event.
* @param <T> the type of event these multiple handlers are interested in.
*/
@SuppressWarnings("rawtypes")
static class MultiListenerHandler implements EventHandler<Event> {
List<EventHandler<Event>> listofHandlers;