MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides client APIs cross MR1 and MR2. (Ahmed via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203371 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9d7402e0af
commit
302e3dfe2c
|
@ -34,6 +34,9 @@ Trunk (unreleased changes)
|
|||
MAPREDUCE-3149. Add a test to verify that TokenCache handles file system
|
||||
uri with no authority. (John George via jitendra)
|
||||
|
||||
MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides
|
||||
client APIs cross MR1 and MR2. (Ahmed via tucu)
|
||||
|
||||
BUG FIXES
|
||||
MAPREDUCE-3346. [Rumen] LoggedTaskAttempt#getHostName() returns null.
|
||||
(amarrk)
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/*
|
||||
* A simple interface for a client MR cluster used for testing. This interface
|
||||
* provides basic methods which are independent of the underlying Mini Cluster (
|
||||
* either through MR1 or MR2).
|
||||
*/
|
||||
public interface MiniMRClientCluster {
|
||||
|
||||
public void start() throws IOException;
|
||||
|
||||
public void stop() throws IOException;
|
||||
|
||||
public Configuration getConfig() throws IOException;
|
||||
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||
|
||||
/**
|
||||
* A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster
|
||||
* interface around the MiniMRYarnCluster. While in MR1, it provides such
|
||||
* wrapper around MiniMRCluster. This factory should be used in tests to provide
|
||||
* an easy migration of tests across MR1 and MR2.
|
||||
*/
|
||||
public class MiniMRClientClusterFactory {
|
||||
|
||||
public static MiniMRClientCluster create(Class<?> caller, int noOfNMs,
|
||||
Configuration conf) throws IOException {
|
||||
|
||||
if (conf == null) {
|
||||
conf = new Configuration();
|
||||
}
|
||||
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
||||
Path testRootDir = new Path("target", caller.getName() + "-tmpDir")
|
||||
.makeQualified(fs);
|
||||
Path appJar = new Path(testRootDir, "MRAppJar.jar");
|
||||
|
||||
// Copy MRAppJar and make it private.
|
||||
Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR);
|
||||
|
||||
fs.copyFromLocalFile(appMasterJar, appJar);
|
||||
fs.setPermission(appJar, new FsPermission("700"));
|
||||
|
||||
Job job = Job.getInstance(conf);
|
||||
|
||||
job.addFileToClassPath(appJar);
|
||||
job.setJarByClass(caller);
|
||||
|
||||
MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
|
||||
.getName(), noOfNMs);
|
||||
miniMRYarnCluster.init(job.getConfiguration());
|
||||
miniMRYarnCluster.start();
|
||||
|
||||
return new MiniMRYarnClusterAdapter(miniMRYarnCluster);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,262 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/**
|
||||
* This class is an MR2 replacement for older MR1 MiniMRCluster, that was used
|
||||
* by tests prior to MR2. This replacement class uses the new MiniMRYarnCluster
|
||||
* in MR2 but provides the same old MR1 interface, so tests can be migrated from
|
||||
* MR1 to MR2 with minimal changes.
|
||||
*
|
||||
* Due to major differences between MR1 and MR2, a number of methods are either
|
||||
* unimplemented/unsupported or were re-implemented to provide wrappers around
|
||||
* MR2 functionality.
|
||||
*/
|
||||
public class MiniMRCluster {
|
||||
private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
|
||||
|
||||
private MiniMRClientCluster mrClientCluster;
|
||||
|
||||
public String getTaskTrackerLocalDir(int taskTracker) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public String[] getTaskTrackerLocalDirs(int taskTracker) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
class JobTrackerRunner {
|
||||
// Mock class
|
||||
}
|
||||
|
||||
class TaskTrackerRunner {
|
||||
// Mock class
|
||||
}
|
||||
|
||||
public JobTrackerRunner getJobTrackerRunner() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
TaskTrackerRunner getTaskTrackerRunner(int id) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public int getNumTaskTrackers() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public void setInlineCleanupThreads() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public void waitUntilIdle() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
private void waitTaskTrackers() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public int getJobTrackerPort() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public JobConf createJobConf() {
|
||||
JobConf jobConf = null;
|
||||
try {
|
||||
jobConf = new JobConf(mrClientCluster.getConfig());
|
||||
} catch (IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
return jobConf;
|
||||
}
|
||||
|
||||
public JobConf createJobConf(JobConf conf) {
|
||||
JobConf jobConf = null;
|
||||
try {
|
||||
jobConf = new JobConf(mrClientCluster.getConfig());
|
||||
} catch (IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
return jobConf;
|
||||
}
|
||||
|
||||
static JobConf configureJobConf(JobConf conf, String namenode,
|
||||
int jobTrackerPort, int jobTrackerInfoPort, UserGroupInformation ugi) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
|
||||
String[] racks, String[] hosts) throws IOException {
|
||||
this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts);
|
||||
}
|
||||
|
||||
public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
|
||||
String[] racks, String[] hosts, JobConf conf) throws IOException {
|
||||
this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf);
|
||||
}
|
||||
|
||||
public MiniMRCluster(int numTaskTrackers, String namenode, int numDir)
|
||||
throws IOException {
|
||||
this(0, 0, numTaskTrackers, namenode, numDir);
|
||||
}
|
||||
|
||||
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
|
||||
int numTaskTrackers, String namenode, int numDir) throws IOException {
|
||||
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
|
||||
null);
|
||||
}
|
||||
|
||||
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
|
||||
int numTaskTrackers, String namenode, int numDir, String[] racks)
|
||||
throws IOException {
|
||||
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
|
||||
racks, null);
|
||||
}
|
||||
|
||||
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
|
||||
int numTaskTrackers, String namenode, int numDir, String[] racks,
|
||||
String[] hosts) throws IOException {
|
||||
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
|
||||
racks, hosts, null);
|
||||
}
|
||||
|
||||
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
|
||||
int numTaskTrackers, String namenode, int numDir, String[] racks,
|
||||
String[] hosts, UserGroupInformation ugi) throws IOException {
|
||||
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
|
||||
racks, hosts, ugi, null);
|
||||
}
|
||||
|
||||
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
|
||||
int numTaskTrackers, String namenode, int numDir, String[] racks,
|
||||
String[] hosts, UserGroupInformation ugi, JobConf conf)
|
||||
throws IOException {
|
||||
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
|
||||
racks, hosts, ugi, conf, 0);
|
||||
}
|
||||
|
||||
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
|
||||
int numTaskTrackers, String namenode, int numDir, String[] racks,
|
||||
String[] hosts, UserGroupInformation ugi, JobConf conf,
|
||||
int numTrackerToExclude) throws IOException {
|
||||
this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
|
||||
racks, hosts, ugi, conf, numTrackerToExclude, new Clock());
|
||||
}
|
||||
|
||||
public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
|
||||
int numTaskTrackers, String namenode, int numDir, String[] racks,
|
||||
String[] hosts, UserGroupInformation ugi, JobConf conf,
|
||||
int numTrackerToExclude, Clock clock) throws IOException {
|
||||
if (conf == null) conf = new JobConf();
|
||||
FileSystem.setDefaultUri(conf, namenode);
|
||||
mrClientCluster = MiniMRClientClusterFactory.create(this.getClass(),
|
||||
numTaskTrackers, conf);
|
||||
}
|
||||
|
||||
public UserGroupInformation getUgi() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from,
|
||||
int max) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public void setJobPriority(JobID jobId, JobPriority priority)
|
||||
throws AccessControlException, IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public JobPriority getJobPriority(JobID jobId) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public long getJobFinishTime(JobID jobId) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public void initializeJob(JobID jobId) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public MapTaskCompletionEventsUpdate getMapTaskCompletionEventsUpdates(
|
||||
int index, JobID jobId, int max) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public JobConf getJobTrackerConf() {
|
||||
JobConf jobConf = null;
|
||||
try {
|
||||
jobConf = new JobConf(mrClientCluster.getConfig());
|
||||
} catch (IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
return jobConf;
|
||||
}
|
||||
|
||||
public int getFaultCount(String hostName) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public void startJobTracker() {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
public void startJobTracker(boolean wait) {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
public void stopJobTracker() {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
public void stopTaskTracker(int id) {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
public void startTaskTracker(String host, String rack, int idx, int numDir)
|
||||
throws IOException {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
void addTaskTracker(TaskTrackerRunner taskTracker) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
int getTaskTrackerID(String trackerName) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
try {
|
||||
mrClientCluster.stop();
|
||||
} catch (IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||
|
||||
/**
|
||||
* An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface.
|
||||
* This interface could be used by tests across both MR1 and MR2.
|
||||
*/
|
||||
public class MiniMRYarnClusterAdapter implements MiniMRClientCluster {
|
||||
|
||||
private MiniMRYarnCluster miniMRYarnCluster;
|
||||
|
||||
public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) {
|
||||
this.miniMRYarnCluster = miniMRYarnCluster;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfig() {
|
||||
return miniMRYarnCluster.getConfig();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
miniMRYarnCluster.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
miniMRYarnCluster.stop();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,170 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Basic testing for the MiniMRClientCluster. This test shows an example class
|
||||
* that can be used in MR1 or MR2, without any change to the test. The test will
|
||||
* use MiniMRYarnCluster in MR2, and MiniMRCluster in MR1.
|
||||
*/
|
||||
public class TestMiniMRClientCluster {
|
||||
|
||||
private static Path inDir = null;
|
||||
private static Path outDir = null;
|
||||
private static Path testdir = null;
|
||||
private static Path[] inFiles = new Path[5];
|
||||
private static MiniMRClientCluster mrCluster;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws IOException {
|
||||
final Configuration conf = new Configuration();
|
||||
final Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
|
||||
"/tmp"));
|
||||
testdir = new Path(TEST_ROOT_DIR, "TestMiniMRClientCluster");
|
||||
inDir = new Path(testdir, "in");
|
||||
outDir = new Path(testdir, "out");
|
||||
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
if (fs.exists(testdir) && !fs.delete(testdir, true)) {
|
||||
throw new IOException("Could not delete " + testdir);
|
||||
}
|
||||
if (!fs.mkdirs(inDir)) {
|
||||
throw new IOException("Mkdirs failed to create " + inDir);
|
||||
}
|
||||
|
||||
for (int i = 0; i < inFiles.length; i++) {
|
||||
inFiles[i] = new Path(inDir, "part_" + i);
|
||||
createFile(inFiles[i], conf);
|
||||
}
|
||||
|
||||
// create the mini cluster to be used for the tests
|
||||
mrCluster = MiniMRClientClusterFactory.create(
|
||||
TestMiniMRClientCluster.class, 1, new Configuration());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanup() throws IOException {
|
||||
// clean up the input and output files
|
||||
final Configuration conf = new Configuration();
|
||||
final FileSystem fs = testdir.getFileSystem(conf);
|
||||
if (fs.exists(testdir)) {
|
||||
fs.delete(testdir, true);
|
||||
}
|
||||
// stopping the mini cluster
|
||||
mrCluster.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJob() throws Exception {
|
||||
final Job job = createJob();
|
||||
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job,
|
||||
inDir);
|
||||
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job,
|
||||
new Path(outDir, "testJob"));
|
||||
assertTrue(job.waitForCompletion(true));
|
||||
validateCounters(job.getCounters(), 5, 25, 5, 5);
|
||||
}
|
||||
|
||||
private void validateCounters(Counters counters, long mapInputRecords,
|
||||
long mapOutputRecords, long reduceInputGroups, long reduceOutputRecords) {
|
||||
assertEquals("MapInputRecords", mapInputRecords, counters.findCounter(
|
||||
"MyCounterGroup", "MAP_INPUT_RECORDS").getValue());
|
||||
assertEquals("MapOutputRecords", mapOutputRecords, counters.findCounter(
|
||||
"MyCounterGroup", "MAP_OUTPUT_RECORDS").getValue());
|
||||
assertEquals("ReduceInputGroups", reduceInputGroups, counters.findCounter(
|
||||
"MyCounterGroup", "REDUCE_INPUT_GROUPS").getValue());
|
||||
assertEquals("ReduceOutputRecords", reduceOutputRecords, counters
|
||||
.findCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS").getValue());
|
||||
}
|
||||
|
||||
private static void createFile(Path inFile, Configuration conf)
|
||||
throws IOException {
|
||||
final FileSystem fs = inFile.getFileSystem(conf);
|
||||
if (fs.exists(inFile)) {
|
||||
return;
|
||||
}
|
||||
FSDataOutputStream out = fs.create(inFile);
|
||||
out.writeBytes("This is a test file");
|
||||
out.close();
|
||||
}
|
||||
|
||||
public static Job createJob() throws IOException {
|
||||
final Job baseJob = new Job(mrCluster.getConfig());
|
||||
baseJob.setOutputKeyClass(Text.class);
|
||||
baseJob.setOutputValueClass(IntWritable.class);
|
||||
baseJob.setMapperClass(MyMapper.class);
|
||||
baseJob.setReducerClass(MyReducer.class);
|
||||
baseJob.setNumReduceTasks(1);
|
||||
return baseJob;
|
||||
}
|
||||
|
||||
public static class MyMapper extends
|
||||
org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
|
||||
private final static IntWritable one = new IntWritable(1);
|
||||
private Text word = new Text();
|
||||
|
||||
public void map(Object key, Text value, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
context.getCounter("MyCounterGroup", "MAP_INPUT_RECORDS").increment(1);
|
||||
StringTokenizer iter = new StringTokenizer(value.toString());
|
||||
while (iter.hasMoreTokens()) {
|
||||
word.set(iter.nextToken());
|
||||
context.write(word, one);
|
||||
context.getCounter("MyCounterGroup", "MAP_OUTPUT_RECORDS").increment(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class MyReducer extends
|
||||
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
|
||||
private IntWritable result = new IntWritable();
|
||||
|
||||
public void reduce(Text key, Iterable<IntWritable> values, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
context.getCounter("MyCounterGroup", "REDUCE_INPUT_GROUPS").increment(1);
|
||||
int sum = 0;
|
||||
for (IntWritable val : values) {
|
||||
sum += val.get();
|
||||
}
|
||||
result.set(sum);
|
||||
context.write(key, result);
|
||||
context.getCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS")
|
||||
.increment(1);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue