Merge -r 1203370:1203371 from trunk to branch. FIXES: MAPREDUCE-3169

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1203402 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2011-11-17 22:48:13 +00:00
parent 7b8e41d6a1
commit 124d34995a
6 changed files with 594 additions and 0 deletions

View File

@ -41,6 +41,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3102. Changed NodeManager to fail fast when LinuxContainerExecutor MAPREDUCE-3102. Changed NodeManager to fail fast when LinuxContainerExecutor
has wrong configuration or permissions. (Hitesh Shah via vinodkv) has wrong configuration or permissions. (Hitesh Shah via vinodkv)
MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides
client APIs cross MR1 and MR2. (Ahmed via tucu)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
/*
* A simple interface for a client MR cluster used for testing. This interface
* provides basic methods which are independent of the underlying Mini Cluster (
* either through MR1 or MR2).
*/
public interface MiniMRClientCluster {
public void start() throws IOException;
public void stop() throws IOException;
public Configuration getConfig() throws IOException;
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
/**
* A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster
* interface around the MiniMRYarnCluster. While in MR1, it provides such
* wrapper around MiniMRCluster. This factory should be used in tests to provide
* an easy migration of tests across MR1 and MR2.
*/
public class MiniMRClientClusterFactory {
public static MiniMRClientCluster create(Class<?> caller, int noOfNMs,
Configuration conf) throws IOException {
if (conf == null) {
conf = new Configuration();
}
FileSystem fs = FileSystem.get(conf);
Path testRootDir = new Path("target", caller.getName() + "-tmpDir")
.makeQualified(fs);
Path appJar = new Path(testRootDir, "MRAppJar.jar");
// Copy MRAppJar and make it private.
Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR);
fs.copyFromLocalFile(appMasterJar, appJar);
fs.setPermission(appJar, new FsPermission("700"));
Job job = Job.getInstance(conf);
job.addFileToClassPath(appJar);
job.setJarByClass(caller);
MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
.getName(), noOfNMs);
miniMRYarnCluster.init(job.getConfiguration());
miniMRYarnCluster.start();
return new MiniMRYarnClusterAdapter(miniMRYarnCluster);
}
}

View File

@ -0,0 +1,262 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
/**
* This class is an MR2 replacement for older MR1 MiniMRCluster, that was used
* by tests prior to MR2. This replacement class uses the new MiniMRYarnCluster
* in MR2 but provides the same old MR1 interface, so tests can be migrated from
* MR1 to MR2 with minimal changes.
*
* Due to major differences between MR1 and MR2, a number of methods are either
* unimplemented/unsupported or were re-implemented to provide wrappers around
* MR2 functionality.
*/
public class MiniMRCluster {
private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
private MiniMRClientCluster mrClientCluster;
public String getTaskTrackerLocalDir(int taskTracker) {
throw new UnsupportedOperationException();
}
public String[] getTaskTrackerLocalDirs(int taskTracker) {
throw new UnsupportedOperationException();
}
class JobTrackerRunner {
// Mock class
}
class TaskTrackerRunner {
// Mock class
}
public JobTrackerRunner getJobTrackerRunner() {
throw new UnsupportedOperationException();
}
TaskTrackerRunner getTaskTrackerRunner(int id) {
throw new UnsupportedOperationException();
}
public int getNumTaskTrackers() {
throw new UnsupportedOperationException();
}
public void setInlineCleanupThreads() {
throw new UnsupportedOperationException();
}
public void waitUntilIdle() {
throw new UnsupportedOperationException();
}
private void waitTaskTrackers() {
throw new UnsupportedOperationException();
}
public int getJobTrackerPort() {
throw new UnsupportedOperationException();
}
public JobConf createJobConf() {
JobConf jobConf = null;
try {
jobConf = new JobConf(mrClientCluster.getConfig());
} catch (IOException e) {
LOG.error(e);
}
return jobConf;
}
public JobConf createJobConf(JobConf conf) {
JobConf jobConf = null;
try {
jobConf = new JobConf(mrClientCluster.getConfig());
} catch (IOException e) {
LOG.error(e);
}
return jobConf;
}
static JobConf configureJobConf(JobConf conf, String namenode,
int jobTrackerPort, int jobTrackerInfoPort, UserGroupInformation ugi) {
throw new UnsupportedOperationException();
}
public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
String[] racks, String[] hosts) throws IOException {
this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts);
}
public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
String[] racks, String[] hosts, JobConf conf) throws IOException {
this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf);
}
public MiniMRCluster(int numTaskTrackers, String namenode, int numDir)
throws IOException {
this(0, 0, numTaskTrackers, namenode, numDir);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir) throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
null);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks)
throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
racks, null);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks,
String[] hosts) throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
racks, hosts, null);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks,
String[] hosts, UserGroupInformation ugi) throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
racks, hosts, ugi, null);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks,
String[] hosts, UserGroupInformation ugi, JobConf conf)
throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
racks, hosts, ugi, conf, 0);
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks,
String[] hosts, UserGroupInformation ugi, JobConf conf,
int numTrackerToExclude) throws IOException {
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
racks, hosts, ugi, conf, numTrackerToExclude, new Clock());
}
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
int numTaskTrackers, String namenode, int numDir, String[] racks,
String[] hosts, UserGroupInformation ugi, JobConf conf,
int numTrackerToExclude, Clock clock) throws IOException {
if (conf == null) conf = new JobConf();
FileSystem.setDefaultUri(conf, namenode);
mrClientCluster = MiniMRClientClusterFactory.create(this.getClass(),
numTaskTrackers, conf);
}
public UserGroupInformation getUgi() {
throw new UnsupportedOperationException();
}
public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from,
int max) throws IOException {
throw new UnsupportedOperationException();
}
public void setJobPriority(JobID jobId, JobPriority priority)
throws AccessControlException, IOException {
throw new UnsupportedOperationException();
}
public JobPriority getJobPriority(JobID jobId) {
throw new UnsupportedOperationException();
}
public long getJobFinishTime(JobID jobId) {
throw new UnsupportedOperationException();
}
public void initializeJob(JobID jobId) throws IOException {
throw new UnsupportedOperationException();
}
public MapTaskCompletionEventsUpdate getMapTaskCompletionEventsUpdates(
int index, JobID jobId, int max) throws IOException {
throw new UnsupportedOperationException();
}
public JobConf getJobTrackerConf() {
JobConf jobConf = null;
try {
jobConf = new JobConf(mrClientCluster.getConfig());
} catch (IOException e) {
LOG.error(e);
}
return jobConf;
}
public int getFaultCount(String hostName) {
throw new UnsupportedOperationException();
}
public void startJobTracker() {
// Do nothing
}
public void startJobTracker(boolean wait) {
// Do nothing
}
public void stopJobTracker() {
// Do nothing
}
public void stopTaskTracker(int id) {
// Do nothing
}
public void startTaskTracker(String host, String rack, int idx, int numDir)
throws IOException {
// Do nothing
}
void addTaskTracker(TaskTrackerRunner taskTracker) {
throw new UnsupportedOperationException();
}
int getTaskTrackerID(String trackerName) {
throw new UnsupportedOperationException();
}
public void shutdown() {
try {
mrClientCluster.stop();
} catch (IOException e) {
LOG.error(e);
}
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
/**
* An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface.
* This interface could be used by tests across both MR1 and MR2.
*/
public class MiniMRYarnClusterAdapter implements MiniMRClientCluster {
private MiniMRYarnCluster miniMRYarnCluster;
public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) {
this.miniMRYarnCluster = miniMRYarnCluster;
}
@Override
public Configuration getConfig() {
return miniMRYarnCluster.getConfig();
}
@Override
public void start() {
miniMRYarnCluster.start();
}
@Override
public void stop() {
miniMRYarnCluster.stop();
}
}

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Basic testing for the MiniMRClientCluster. This test shows an example class
* that can be used in MR1 or MR2, without any change to the test. The test will
* use MiniMRYarnCluster in MR2, and MiniMRCluster in MR1.
*/
public class TestMiniMRClientCluster {
private static Path inDir = null;
private static Path outDir = null;
private static Path testdir = null;
private static Path[] inFiles = new Path[5];
private static MiniMRClientCluster mrCluster;
@BeforeClass
public static void setup() throws IOException {
final Configuration conf = new Configuration();
final Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
"/tmp"));
testdir = new Path(TEST_ROOT_DIR, "TestMiniMRClientCluster");
inDir = new Path(testdir, "in");
outDir = new Path(testdir, "out");
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(testdir) && !fs.delete(testdir, true)) {
throw new IOException("Could not delete " + testdir);
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir);
}
for (int i = 0; i < inFiles.length; i++) {
inFiles[i] = new Path(inDir, "part_" + i);
createFile(inFiles[i], conf);
}
// create the mini cluster to be used for the tests
mrCluster = MiniMRClientClusterFactory.create(
TestMiniMRClientCluster.class, 1, new Configuration());
}
@AfterClass
public static void cleanup() throws IOException {
// clean up the input and output files
final Configuration conf = new Configuration();
final FileSystem fs = testdir.getFileSystem(conf);
if (fs.exists(testdir)) {
fs.delete(testdir, true);
}
// stopping the mini cluster
mrCluster.stop();
}
@Test
public void testJob() throws Exception {
final Job job = createJob();
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job,
inDir);
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job,
new Path(outDir, "testJob"));
assertTrue(job.waitForCompletion(true));
validateCounters(job.getCounters(), 5, 25, 5, 5);
}
private void validateCounters(Counters counters, long mapInputRecords,
long mapOutputRecords, long reduceInputGroups, long reduceOutputRecords) {
assertEquals("MapInputRecords", mapInputRecords, counters.findCounter(
"MyCounterGroup", "MAP_INPUT_RECORDS").getValue());
assertEquals("MapOutputRecords", mapOutputRecords, counters.findCounter(
"MyCounterGroup", "MAP_OUTPUT_RECORDS").getValue());
assertEquals("ReduceInputGroups", reduceInputGroups, counters.findCounter(
"MyCounterGroup", "REDUCE_INPUT_GROUPS").getValue());
assertEquals("ReduceOutputRecords", reduceOutputRecords, counters
.findCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS").getValue());
}
private static void createFile(Path inFile, Configuration conf)
throws IOException {
final FileSystem fs = inFile.getFileSystem(conf);
if (fs.exists(inFile)) {
return;
}
FSDataOutputStream out = fs.create(inFile);
out.writeBytes("This is a test file");
out.close();
}
public static Job createJob() throws IOException {
final Job baseJob = new Job(mrCluster.getConfig());
baseJob.setOutputKeyClass(Text.class);
baseJob.setOutputValueClass(IntWritable.class);
baseJob.setMapperClass(MyMapper.class);
baseJob.setReducerClass(MyReducer.class);
baseJob.setNumReduceTasks(1);
return baseJob;
}
public static class MyMapper extends
org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter("MyCounterGroup", "MAP_INPUT_RECORDS").increment(1);
StringTokenizer iter = new StringTokenizer(value.toString());
while (iter.hasMoreTokens()) {
word.set(iter.nextToken());
context.write(word, one);
context.getCounter("MyCounterGroup", "MAP_OUTPUT_RECORDS").increment(1);
}
}
}
public static class MyReducer extends
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
context.getCounter("MyCounterGroup", "REDUCE_INPUT_GROUPS").increment(1);
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
context.getCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS")
.increment(1);
}
}
}