Revert "MAPREDUCE-5875. Make Counter limits consistent across JobClient, MRAppMaster, and YarnChild. (Gera Shegalov via kasha)"

This reverts commit 7bfd9e068d.

Conflicts:
	hadoop-mapreduce-project/CHANGES.txt
This commit is contained in:
Vinod Kumar Vavilapalli 2015-03-31 11:30:04 -07:00
parent 1c050bd135
commit ec123abdc9
8 changed files with 22 additions and 128 deletions

View File

@ -405,9 +405,6 @@ Release 2.6.0 - 2014-11-18
MAPREDUCE-6123. TestCombineFileInputFormat incorrectly starts 2 MAPREDUCE-6123. TestCombineFileInputFormat incorrectly starts 2
MiniDFSCluster instances. (cnauroth) MiniDFSCluster instances. (cnauroth)
MAPREDUCE-5875. Make Counter limits consistent across JobClient,
MRAppMaster, and YarnChild. (Gera Shegalov via kasha)
MAPREDUCE-6125. TestContainerLauncherImpl sometimes fails (Mit Desai via MAPREDUCE-6125. TestContainerLauncherImpl sometimes fails (Mit Desai via
jlowe) jlowe)
@ -453,7 +450,6 @@ Release 2.5.2 - 2014-11-19
BUG FIXES BUG FIXES
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -54,7 +54,6 @@ import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.EventReader; import org.apache.hadoop.mapreduce.jobhistory.EventReader;
import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.EventType;
@ -1090,8 +1089,6 @@ public class MRAppMaster extends CompositeService {
// finally set the job classloader // finally set the job classloader
MRApps.setClassLoader(jobClassLoader, getConfig()); MRApps.setClassLoader(jobClassLoader, getConfig());
// set job classloader if configured
Limits.init(getConfig());
if (initFailed) { if (initFailed) {
JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);

View File

@ -183,15 +183,15 @@ public class Cluster {
public Job getJob(JobID jobId) throws IOException, InterruptedException { public Job getJob(JobID jobId) throws IOException, InterruptedException {
JobStatus status = client.getJobStatus(jobId); JobStatus status = client.getJobStatus(jobId);
if (status != null) { if (status != null) {
final JobConf conf = new JobConf(); JobConf conf;
final Path jobPath = new Path(client.getFilesystemName(),
status.getJobFile());
final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf());
try { try {
conf.addResource(fs.open(jobPath), jobPath.toString()); conf = new JobConf(status.getJobFile());
} catch (FileNotFoundException fnf) { } catch (RuntimeException ex) {
if (LOG.isWarnEnabled()) { // If job file doesn't exist it means we can't find the job
LOG.warn("Job conf missing on cluster", fnf); if (ex.getCause() instanceof FileNotFoundException) {
return null;
} else {
throw ex;
} }
} }
return Job.getInstance(this, status, conf); return Job.getInstance(this, status, conf);

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.mapred.QueueACL;
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName; import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
@ -236,7 +235,6 @@ class JobSubmitter {
// Write job file to submit dir // Write job file to submit dir
writeConf(conf, submitJobFile); writeConf(conf, submitJobFile);
Limits.reset(conf);
// //
// Now, actually submit the job (using the submit name) // Now, actually submit the job (using the submit name)

View File

@ -123,9 +123,4 @@ public class Limits {
public synchronized LimitExceededException violation() { public synchronized LimitExceededException violation() {
return firstViolation; return firstViolation;
} }
public static synchronized void reset(Configuration conf) {
isInited = false;
init(conf);
}
} }

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.text.Format; import java.text.Format;
@ -30,8 +29,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -44,7 +41,6 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.util.HostUtil; import org.apache.hadoop.mapreduce.util.HostUtil;
@ -58,8 +54,7 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class HistoryViewer { public class HistoryViewer {
private static final Log LOG = LogFactory.getLog(HistoryViewer.class); private static SimpleDateFormat dateFormat =
private static final SimpleDateFormat dateFormat =
new SimpleDateFormat("d-MMM-yyyy HH:mm:ss"); new SimpleDateFormat("d-MMM-yyyy HH:mm:ss");
private FileSystem fs; private FileSystem fs;
private JobInfo job; private JobInfo job;
@ -88,17 +83,6 @@ public class HistoryViewer {
System.err.println("Ignore unrecognized file: " + jobFile.getName()); System.err.println("Ignore unrecognized file: " + jobFile.getName());
throw new IOException(errorMsg); throw new IOException(errorMsg);
} }
final Path jobConfPath = new Path(jobFile.getParent(), jobDetails[0]
+ "_" + jobDetails[1] + "_" + jobDetails[2] + "_conf.xml");
final Configuration jobConf = new Configuration(conf);
try {
jobConf.addResource(fs.open(jobConfPath), jobConfPath.toString());
Limits.reset(conf);
} catch (FileNotFoundException fnf) {
if (LOG.isWarnEnabled()) {
LOG.warn("Missing job conf in history", fnf);
}
}
JobHistoryParser parser = new JobHistoryParser(fs, jobFile); JobHistoryParser parser = new JobHistoryParser(fs, jobFile);
job = parser.parse(); job = parser.parse();
jobId = job.getJobId().toString(); jobId = job.getJobId().toString();

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.mapreduce.v2.hs; package org.apache.hadoop.mapreduce.v2.hs;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
@ -35,7 +34,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskCompletionEvent;
@ -43,7 +41,6 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@ -344,21 +341,9 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
if (historyFileAbsolute != null) { if (historyFileAbsolute != null) {
JobHistoryParser parser = null; JobHistoryParser parser = null;
try { try {
final FileSystem fs = historyFileAbsolute.getFileSystem(conf);
parser = parser =
new JobHistoryParser(historyFileAbsolute.getFileSystem(conf), new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
historyFileAbsolute); historyFileAbsolute);
final Path jobConfPath = new Path(historyFileAbsolute.getParent(),
JobHistoryUtils.getIntermediateConfFileName(jobId));
final Configuration conf = new Configuration();
try {
conf.addResource(fs.open(jobConfPath), jobConfPath.toString());
Limits.reset(conf);
} catch (FileNotFoundException fnf) {
if (LOG.isWarnEnabled()) {
LOG.warn("Missing job conf in history", fnf);
}
}
this.jobInfo = parser.parse(); this.jobInfo = parser.parse();
} catch (IOException e) { } catch (IOException e) {
throw new YarnRuntimeException("Could not load history file " throw new YarnRuntimeException("Could not load history file "

View File

@ -54,14 +54,10 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
@ -112,7 +108,6 @@ public class TestMRJobs {
EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED); EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
private static final int NUM_NODE_MGRS = 3; private static final int NUM_NODE_MGRS = 3;
private static final String TEST_IO_SORT_MB = "11"; private static final String TEST_IO_SORT_MB = "11";
private static final String TEST_GROUP_MAX = "200";
private static final int DEFAULT_REDUCES = 2; private static final int DEFAULT_REDUCES = 2;
protected int numSleepReducers = DEFAULT_REDUCES; protected int numSleepReducers = DEFAULT_REDUCES;
@ -243,58 +238,31 @@ public class TestMRJobs {
} }
@Test(timeout = 300000) @Test(timeout = 300000)
public void testConfVerificationWithClassloader() throws Exception { public void testJobClassloader() throws IOException, InterruptedException,
testConfVerification(true, false, false, false); ClassNotFoundException {
testJobClassloader(false);
} }
@Test(timeout = 300000) @Test(timeout = 300000)
public void testConfVerificationWithClassloaderCustomClasses() public void testJobClassloaderWithCustomClasses() throws IOException,
throws Exception { InterruptedException, ClassNotFoundException {
testConfVerification(true, true, false, false); testJobClassloader(true);
} }
@Test(timeout = 300000) private void testJobClassloader(boolean useCustomClasses) throws IOException,
public void testConfVerificationWithOutClassloader() throws Exception { InterruptedException, ClassNotFoundException {
testConfVerification(false, false, false, false); LOG.info("\n\n\nStarting testJobClassloader()"
} + " useCustomClasses=" + useCustomClasses);
@Test(timeout = 300000)
public void testConfVerificationWithJobClient() throws Exception {
testConfVerification(false, false, true, false);
}
@Test(timeout = 300000)
public void testConfVerificationWithJobClientLocal() throws Exception {
testConfVerification(false, false, true, true);
}
private void testConfVerification(boolean useJobClassLoader,
boolean useCustomClasses, boolean useJobClientForMonitring,
boolean useLocal) throws Exception {
LOG.info("\n\n\nStarting testConfVerification()"
+ " jobClassloader=" + useJobClassLoader
+ " customClasses=" + useCustomClasses
+ " jobClient=" + useJobClientForMonitring
+ " localMode=" + useLocal);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test."); + " not found. Not running test.");
return; return;
} }
final Configuration clusterConfig; final Configuration sleepConf = new Configuration(mrCluster.getConfig());
if (useLocal) {
clusterConfig = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
} else {
clusterConfig = mrCluster.getConfig();
}
final JobClient jc = new JobClient(clusterConfig);
final Configuration sleepConf = new Configuration(clusterConfig);
// set master address to local to test that local mode applied iff framework == local // set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
useJobClassLoader);
if (useCustomClasses) { if (useCustomClasses) {
// to test AM loading user classes such as output format class, we want // to test AM loading user classes such as output format class, we want
// to blacklist them from the system classes (they need to be prepended // to blacklist them from the system classes (they need to be prepended
@ -312,7 +280,6 @@ public class TestMRJobs {
sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString()); sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString()); sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class"); sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class");
sleepConf.set(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TEST_GROUP_MAX);
final SleepJob sleepJob = new SleepJob(); final SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf); sleepJob.setConf(sleepConf);
final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1); final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1);
@ -330,26 +297,7 @@ public class TestMRJobs {
jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true); jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
} }
job.submit(); job.submit();
final boolean succeeded; boolean succeeded = job.waitForCompletion(true);
if (useJobClientForMonitring && !useLocal) {
// We can't use getJobID in useLocal case because JobClient and Job
// point to different instances of LocalJobRunner
//
final JobID mapredJobID = JobID.downgrade(job.getJobID());
RunningJob runningJob = null;
do {
Thread.sleep(10);
runningJob = jc.getJob(mapredJobID);
} while (runningJob == null);
Assert.assertEquals("Unexpected RunningJob's "
+ MRJobConfig.COUNTER_GROUPS_MAX_KEY,
TEST_GROUP_MAX, runningJob.getConfiguration()
.get(MRJobConfig.COUNTER_GROUPS_MAX_KEY));
runningJob.waitForCompletion();
succeeded = runningJob.isSuccessful();
} else {
succeeded = job.waitForCompletion(true);
}
Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(), Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
succeeded); succeeded);
} }
@ -1003,14 +951,5 @@ public class TestMRJobs {
+ ", actual: " + ioSortMb); + ", actual: " + ioSortMb);
} }
} }
@Override
public void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException {
super.map(key, value, context);
for (int i = 0; i < 100; i++) {
context.getCounter("testCounterGroup-" + i,
"testCounter").increment(1);
}
}
} }
} }