svn merge -c 1443027. FIXES: MAPREDUCE-4905. test org.apache.hadoop.mapred.pipes (Aleksey Gorshkov via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1443033 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2013-02-06 15:57:46 +00:00
parent 1bce97d9ef
commit dcbad333ad
8 changed files with 1262 additions and 1 deletions

View File

@ -539,6 +539,9 @@ Release 0.23.7 - UNRELEASED
IMPROVEMENTS
MAPREDUCE-4905. test org.apache.hadoop.mapred.pipes
(Aleksey Gorshkov via bobby)
OPTIMIZATIONS
MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.mapred.lib.LazyOutputFormat;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
@ -515,7 +516,7 @@ public class Submitter extends Configured implements Tool {
*/
public static void main(String[] args) throws Exception {
int exitCode = new Submitter().run(args);
System.exit(exitCode);
ExitUtil.terminate(exitCode);
}
}

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.pipes;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import javax.crypto.SecretKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
public class CommonStub {
protected Socket socket = null;
protected DataInputStream dataInput;
protected DataOutputStream dataOut;
protected String createDigest(byte[] password, String data) throws IOException {
SecretKey key = JobTokenSecretManager.createSecretKey(password);
return SecureShuffleUtils.hashFromString(data, key);
}
protected void readObject(Writable obj, DataInputStream inStream) throws IOException {
int numBytes = WritableUtils.readVInt(inStream);
byte[] buffer;
// For BytesWritable and Text, use the specified length to set the length
// this causes the "obvious" translations to work. So that if you emit
// a string "abc" from C++, it shows up as "abc".
if (obj instanceof BytesWritable) {
buffer = new byte[numBytes];
inStream.readFully(buffer);
((BytesWritable) obj).set(buffer, 0, numBytes);
} else if (obj instanceof Text) {
buffer = new byte[numBytes];
inStream.readFully(buffer);
((Text) obj).set(buffer);
} else {
obj.readFields(inStream);
}
}
protected void writeObject(Writable obj, DataOutputStream stream)
throws IOException {
// For Text and BytesWritable, encode them directly, so that they end up
// in C++ as the natural translations.
DataOutputBuffer buffer = new DataOutputBuffer();
if (obj instanceof Text) {
Text t = (Text) obj;
int len = t.getLength();
WritableUtils.writeVLong(stream, len);
stream.flush();
stream.write(t.getBytes(), 0, len);
stream.flush();
} else if (obj instanceof BytesWritable) {
BytesWritable b = (BytesWritable) obj;
int len = b.getLength();
WritableUtils.writeVLong(stream, len);
stream.write(b.getBytes(), 0, len);
} else {
buffer.reset();
obj.write(buffer);
int length = buffer.getLength();
WritableUtils.writeVInt(stream, length);
stream.write(buffer.getData(), 0, length);
}
stream.flush();
}
protected void initSoket() throws Exception {
int port = Integer.parseInt(System.getenv("mapreduce.pipes.command.port"));
java.net.InetAddress address = java.net.InetAddress.getLocalHost();
socket = new Socket(address.getHostName(), port);
InputStream input = socket.getInputStream();
OutputStream output = socket.getOutputStream();
// try to read
dataInput = new DataInputStream(input);
WritableUtils.readVInt(dataInput);
String str = Text.readString(dataInput);
Text.readString(dataInput);
dataOut = new DataOutputStream(output);
WritableUtils.writeVInt(dataOut, 57);
String s = createDigest("password".getBytes(), str);
Text.writeString(dataOut, s);
// start
WritableUtils.readVInt(dataInput);
int cuttentAnswer = WritableUtils.readVInt(dataInput);
System.out.println("CURRENT_PROTOCOL_VERSION:" + cuttentAnswer);
// get configuration
// should be MessageType.SET_JOB_CONF.code
WritableUtils.readVInt(dataInput);
// array length
int j = WritableUtils.readVInt(dataInput);
for (int i = 0; i < j; i++) {
Text.readString(dataInput);
i++;
Text.readString(dataInput);
}
}
protected void closeSoket() {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.pipes;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
/*
Stub for TestPipeApplication test. This stub produced test data for main test. Main test checks data
*/
public class PipeApplicationRunnableStub extends CommonStub {
public static void main(String[] args) {
PipeApplicationRunnableStub client = new PipeApplicationRunnableStub();
client.binaryProtocolStub();
}
public void binaryProtocolStub() {
try {
initSoket();
System.out.println("start OK");
// RUN_MAP.code
// should be 3
int answer = WritableUtils.readVInt(dataInput);
System.out.println("RunMap:" + answer);
TestPipeApplication.FakeSplit split = new TestPipeApplication.FakeSplit();
readObject(split, dataInput);
WritableUtils.readVInt(dataInput);
WritableUtils.readVInt(dataInput);
// end runMap
// get InputTypes
WritableUtils.readVInt(dataInput);
String inText = Text.readString(dataInput);
System.out.println("Key class:" + inText);
inText = Text.readString(dataInput);
System.out.println("Value class:" + inText);
@SuppressWarnings("unused")
int inCode = 0;
// read all data from sender and write to output
while ((inCode = WritableUtils.readVInt(dataInput)) == 4) {
FloatWritable key = new FloatWritable();
NullWritable value = NullWritable.get();
readObject(key, dataInput);
System.out.println("value:" + key.get());
readObject(value, dataInput);
}
WritableUtils.writeVInt(dataOut, 54);
dataOut.flush();
dataOut.close();
} catch (Exception x) {
x.printStackTrace();
} finally {
closeSoket();
}
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.pipes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
/*
Stub for TestPipeApplication test. This stub produced test data for main test. Main test checks data
*/
public class PipeApplicationStub extends CommonStub {
public static void main(String[] args) {
PipeApplicationStub client = new PipeApplicationStub();
client.binaryProtocolStub();
}
public void binaryProtocolStub() {
try {
initSoket();
// output code
WritableUtils.writeVInt(dataOut, 50);
IntWritable wt = new IntWritable();
wt.set(123);
writeObject(wt, dataOut);
writeObject(new Text("value"), dataOut);
// PARTITIONED_OUTPUT
WritableUtils.writeVInt(dataOut, 51);
WritableUtils.writeVInt(dataOut, 0);
writeObject(wt, dataOut);
writeObject(new Text("value"), dataOut);
// STATUS
WritableUtils.writeVInt(dataOut, 52);
Text.writeString(dataOut, "PROGRESS");
dataOut.flush();
// progress
WritableUtils.writeVInt(dataOut, 53);
dataOut.writeFloat(0.55f);
// register counter
WritableUtils.writeVInt(dataOut, 55);
// id
WritableUtils.writeVInt(dataOut, 0);
Text.writeString(dataOut, "group");
Text.writeString(dataOut, "name");
// increment counter
WritableUtils.writeVInt(dataOut, 56);
WritableUtils.writeVInt(dataOut, 0);
WritableUtils.writeVLong(dataOut, 2);
// map item
int intValue = WritableUtils.readVInt(dataInput);
System.out.println("intValue:" + intValue);
IntWritable iw = new IntWritable();
readObject(iw, dataInput);
System.out.println("key:" + iw.get());
Text txt = new Text();
readObject(txt, dataInput);
System.out.println("value:" + txt.toString());
// done
// end of session
WritableUtils.writeVInt(dataOut, 54);
System.out.println("finish");
dataOut.flush();
dataOut.close();
} catch (Exception x) {
x.printStackTrace();
} finally {
closeSoket();
}
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.pipes;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
/*
Stub for TestPipeApplication test. This stub produced test data for main test. Main test checks data
*/
public class PipeReducerStub extends CommonStub {
public static void main(String[] args) {
PipeReducerStub client = new PipeReducerStub();
client.binaryProtocolStub();
}
public void binaryProtocolStub() {
try {
initSoket();
//should be 5
//RUN_REDUCE boolean
WritableUtils.readVInt(dataInput);
WritableUtils.readVInt(dataInput);
int intValue = WritableUtils.readVInt(dataInput);
System.out.println("getIsJavaRecordWriter:" + intValue);
// reduce key
WritableUtils.readVInt(dataInput);
// value of reduce key
BooleanWritable value = new BooleanWritable();
readObject(value, dataInput);
System.out.println("reducer key :" + value);
// reduce value code:
// reduce values
while ((intValue = WritableUtils.readVInt(dataInput)) == 7) {
Text txt = new Text();
// value
readObject(txt, dataInput);
System.out.println("reduce value :" + txt);
}
// done
WritableUtils.writeVInt(dataOut, 54);
dataOut.flush();
dataOut.close();
} catch (Exception x) {
x.printStackTrace();
} finally {
closeSoket();
}
}
}

View File

@ -0,0 +1,747 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.pipes;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestPipeApplication {
private static File workSpace = new File("target",
TestPipeApplication.class.getName() + "-workSpace");
private static String taskName = "attempt_001_02_r03_04_05";
/**
* test PipesMapRunner test the transfer data from reader
*
* @throws Exception
*/
@Test
public void testRunner() throws Exception {
// clean old password files
File[] psw = cleanTokenPasswordFile();
try {
RecordReader<FloatWritable, NullWritable> rReader = new ReaderPipesMapRunner();
JobConf conf = new JobConf();
conf.set(Submitter.IS_JAVA_RR, "true");
// for stdour and stderror
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName);
CombineOutputCollector<IntWritable, Text> output = new CombineOutputCollector<IntWritable, Text>(
new Counters.Counter(), new Progress());
FileSystem fs = new RawLocalFileSystem();
fs.setConf(conf);
Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs,
new Path(workSpace + File.separator + "outfile"), IntWritable.class,
Text.class, null, null);
output.setWriter(wr);
// stub for client
File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub");
conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath());
// token for authorization
Token<ApplicationTokenIdentifier> token = new Token<ApplicationTokenIdentifier>(
"user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
"service"));
conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token);
conf.setBoolean(MRJobConfig.SKIP_RECORDS, true);
TestTaskReporter reporter = new TestTaskReporter();
PipesMapRunner<FloatWritable, NullWritable, IntWritable, Text> runner = new PipesMapRunner<FloatWritable, NullWritable, IntWritable, Text>();
initStdOut(conf);
runner.configure(conf);
runner.run(rReader, output, reporter);
String stdOut = readStdOut(conf);
// test part of translated data. As common file for client and test -
// clients stdOut
// check version
assertTrue(stdOut.contains("CURRENT_PROTOCOL_VERSION:0"));
// check key and value classes
assertTrue(stdOut
.contains("Key class:org.apache.hadoop.io.FloatWritable"));
assertTrue(stdOut
.contains("Value class:org.apache.hadoop.io.NullWritable"));
// test have sent all data from reader
assertTrue(stdOut.contains("value:0.0"));
assertTrue(stdOut.contains("value:9.0"));
} finally {
if (psw != null) {
// remove password files
for (File file : psw) {
file.deleteOnExit();
}
}
}
}
/**
* test org.apache.hadoop.mapred.pipes.Application
* test a internal functions: MessageType.REGISTER_COUNTER, INCREMENT_COUNTER, STATUS, PROGRESS...
*
* @throws Throwable
*/
@Test
public void testApplication() throws Throwable {
JobConf conf = new JobConf();
RecordReader<FloatWritable, NullWritable> rReader = new Reader();
// client for test
File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationStub");
TestTaskReporter reporter = new TestTaskReporter();
File[] psw = cleanTokenPasswordFile();
try {
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName);
conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath());
// token for authorization
Token<ApplicationTokenIdentifier> token = new Token<ApplicationTokenIdentifier>(
"user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
"service"));
conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token);
FakeCollector output = new FakeCollector(new Counters.Counter(),
new Progress());
FileSystem fs = new RawLocalFileSystem();
fs.setConf(conf);
Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs,
new Path(workSpace.getAbsolutePath() + File.separator + "outfile"),
IntWritable.class, Text.class, null, null);
output.setWriter(wr);
conf.set(Submitter.PRESERVE_COMMANDFILE, "true");
Application<WritableComparable<IntWritable>, Writable, IntWritable, Text> application = new Application<WritableComparable<IntWritable>, Writable, IntWritable, Text>(
conf, rReader, output, reporter, IntWritable.class, Text.class);
application.getDownlink().flush();
application.getDownlink().mapItem(new IntWritable(3), new Text("txt"));
application.getDownlink().flush();
application.waitForFinish();
wr.close();
// test getDownlink().mapItem();
String stdOut = readStdOut(conf);
assertTrue(stdOut.contains("key:3"));
assertTrue(stdOut.contains("value:txt"));
// reporter test counter, and status should be sended
// test MessageType.REGISTER_COUNTER and INCREMENT_COUNTER
assertEquals(1.0, reporter.getProgress(), 0.01);
assertNotNull(reporter.getCounter("group", "name"));
// test status MessageType.STATUS
assertEquals(reporter.getStatus(), "PROGRESS");
stdOut = readFile(new File(workSpace.getAbsolutePath() + File.separator
+ "outfile"));
// check MessageType.PROGRESS
assertEquals(0.55f, rReader.getProgress(), 0.001);
application.getDownlink().close();
// test MessageType.OUTPUT
Entry<IntWritable, Text> entry = output.getCollect().entrySet()
.iterator().next();
assertEquals(123, entry.getKey().get());
assertEquals("value", entry.getValue().toString());
try {
// try to abort
application.abort(new Throwable());
fail();
} catch (IOException e) {
// abort works ?
assertEquals("pipe child exception", e.getMessage());
}
} finally {
if (psw != null) {
// remove password files
for (File file : psw) {
file.deleteOnExit();
}
}
}
}
/**
* test org.apache.hadoop.mapred.pipes.Submitter
*
* @throws Exception
*/
@Test
public void testSubmitter() throws Exception {
JobConf conf = new JobConf();
File[] psw = cleanTokenPasswordFile();
System.setProperty("test.build.data",
"target/tmp/build/TEST_SUBMITTER_MAPPER/data");
conf.set("hadoop.log.dir", "target/tmp");
// prepare configuration
Submitter.setIsJavaMapper(conf, false);
Submitter.setIsJavaReducer(conf, false);
Submitter.setKeepCommandFile(conf, false);
Submitter.setIsJavaRecordReader(conf, false);
Submitter.setIsJavaRecordWriter(conf, false);
PipesPartitioner<IntWritable, Text> partitioner = new PipesPartitioner<IntWritable, Text>();
partitioner.configure(conf);
Submitter.setJavaPartitioner(conf, partitioner.getClass());
assertEquals(PipesPartitioner.class, (Submitter.getJavaPartitioner(conf)));
// test going to call main method with System.exit(). Change Security
SecurityManager securityManager = System.getSecurityManager();
// store System.out
PrintStream oldps = System.out;
ByteArrayOutputStream out = new ByteArrayOutputStream();
ExitUtil.disableSystemExit();
// test without parameters
try {
System.setOut(new PrintStream(out));
Submitter.main(new String[0]);
fail();
} catch (ExitUtil.ExitException e) {
// System.exit prohibited! output message test
assertTrue(out.toString().contains(""));
assertTrue(out.toString().contains("bin/hadoop pipes"));
assertTrue(out.toString().contains("[-input <path>] // Input directory"));
assertTrue(out.toString()
.contains("[-output <path>] // Output directory"));
assertTrue(out.toString().contains("[-jar <jar file> // jar filename"));
assertTrue(out.toString().contains(
"[-inputformat <class>] // InputFormat class"));
assertTrue(out.toString().contains("[-map <class>] // Java Map class"));
assertTrue(out.toString().contains(
"[-partitioner <class>] // Java Partitioner"));
assertTrue(out.toString().contains(
"[-reduce <class>] // Java Reduce class"));
assertTrue(out.toString().contains(
"[-writer <class>] // Java RecordWriter"));
assertTrue(out.toString().contains(
"[-program <executable>] // executable URI"));
assertTrue(out.toString().contains(
"[-reduces <num>] // number of reduces"));
assertTrue(out.toString().contains(
"[-lazyOutput <true/false>] // createOutputLazily"));
assertTrue(out
.toString()
.contains(
"-conf <configuration file> specify an application configuration file"));
assertTrue(out.toString().contains(
"-D <property=value> use value for given property"));
assertTrue(out.toString().contains(
"-fs <local|namenode:port> specify a namenode"));
assertTrue(out.toString().contains(
"-jt <local|jobtracker:port> specify a job tracker"));
assertTrue(out
.toString()
.contains(
"-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster"));
assertTrue(out
.toString()
.contains(
"-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath."));
assertTrue(out
.toString()
.contains(
"-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines."));
} finally {
System.setOut(oldps);
// restore
System.setSecurityManager(securityManager);
if (psw != null) {
// remove password files
for (File file : psw) {
file.deleteOnExit();
}
}
}
// test call Submitter form command line
try {
File fCommand = getFileCommand(null);
String[] args = new String[22];
File input = new File(workSpace + File.separator + "input");
if (!input.exists()) {
Assert.assertTrue(input.createNewFile());
}
File outPut = new File(workSpace + File.separator + "output");
FileUtil.fullyDelete(outPut);
args[0] = "-input";
args[1] = input.getAbsolutePath();// "input";
args[2] = "-output";
args[3] = outPut.getAbsolutePath();// "output";
args[4] = "-inputformat";
args[5] = "org.apache.hadoop.mapred.TextInputFormat";
args[6] = "-map";
args[7] = "org.apache.hadoop.mapred.lib.IdentityMapper";
args[8] = "-partitioner";
args[9] = "org.apache.hadoop.mapred.pipes.PipesPartitioner";
args[10] = "-reduce";
args[11] = "org.apache.hadoop.mapred.lib.IdentityReducer";
args[12] = "-writer";
args[13] = "org.apache.hadoop.mapred.TextOutputFormat";
args[14] = "-program";
args[15] = fCommand.getAbsolutePath();// "program";
args[16] = "-reduces";
args[17] = "2";
args[18] = "-lazyOutput";
args[19] = "lazyOutput";
args[20] = "-jobconf";
args[21] = "mapreduce.pipes.isjavarecordwriter=false,mapreduce.pipes.isjavarecordreader=false";
Submitter.main(args);
fail();
} catch (ExitUtil.ExitException e) {
// status should be 0
assertEquals(e.status, 0);
} finally {
System.setOut(oldps);
System.setSecurityManager(securityManager);
}
}
/**
* test org.apache.hadoop.mapred.pipes.PipesReducer
* test the transfer of data: key and value
*
* @throws Exception
*/
@Test
public void testPipesReduser() throws Exception {
File[] psw = cleanTokenPasswordFile();
JobConf conf = new JobConf();
try {
Token<ApplicationTokenIdentifier> token = new Token<ApplicationTokenIdentifier>(
"user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
"service"));
conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token);
File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeReducerStub");
conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath());
PipesReducer<BooleanWritable, Text, IntWritable, Text> reducer = new PipesReducer<BooleanWritable, Text, IntWritable, Text>();
reducer.configure(conf);
BooleanWritable bw = new BooleanWritable(true);
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName);
initStdOut(conf);
conf.setBoolean(MRJobConfig.SKIP_RECORDS, true);
CombineOutputCollector<IntWritable, Text> output = new CombineOutputCollector<IntWritable, Text>(
new Counters.Counter(), new Progress());
Reporter reporter = new TestTaskReporter();
List<Text> texts = new ArrayList<Text>();
texts.add(new Text("first"));
texts.add(new Text("second"));
texts.add(new Text("third"));
reducer.reduce(bw, texts.iterator(), output, reporter);
reducer.close();
String stdOut = readStdOut(conf);
// test data: key
assertTrue(stdOut.contains("reducer key :true"));
// and values
assertTrue(stdOut.contains("reduce value :first"));
assertTrue(stdOut.contains("reduce value :second"));
assertTrue(stdOut.contains("reduce value :third"));
} finally {
if (psw != null) {
// remove password files
for (File file : psw) {
file.deleteOnExit();
}
}
}
}
/**
* test PipesPartitioner
* test set and get data from PipesPartitioner
*/
@Test
public void testPipesPartitioner() {
PipesPartitioner<IntWritable, Text> partitioner = new PipesPartitioner<IntWritable, Text>();
JobConf configuration = new JobConf();
Submitter.getJavaPartitioner(configuration);
partitioner.configure(new JobConf());
IntWritable iw = new IntWritable(4);
// the cache empty
assertEquals(0, partitioner.getPartition(iw, new Text("test"), 2));
// set data into cache
PipesPartitioner.setNextPartition(3);
// get data from cache
assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2));
}
/**
* clean previous std error and outs
*/
private void initStdOut(JobConf configuration) {
TaskAttemptID taskId = TaskAttemptID.forName(configuration
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
File stdErr = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDERR);
// prepare folder
if (!stdOut.getParentFile().exists()) {
stdOut.getParentFile().mkdirs();
} else { // clean logs
stdOut.deleteOnExit();
stdErr.deleteOnExit();
}
}
private String readStdOut(JobConf conf) throws Exception {
TaskAttemptID taskId = TaskAttemptID.forName(conf
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
return readFile(stdOut);
}
private String readFile(File file) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
InputStream is = new FileInputStream(file);
byte[] buffer = new byte[1024];
int counter = 0;
while ((counter = is.read(buffer)) >= 0) {
out.write(buffer, 0, counter);
}
is.close();
return out.toString();
}
private class Progress implements Progressable {
@Override
public void progress() {
}
}
private File[] cleanTokenPasswordFile() throws Exception {
File[] result = new File[2];
result[0] = new File("./jobTokenPassword");
if (result[0].exists()) {
FileUtil.chmod(result[0].getAbsolutePath(), "700");
assertTrue(result[0].delete());
}
result[1] = new File("./.jobTokenPassword.crc");
if (result[1].exists()) {
FileUtil.chmod(result[1].getAbsolutePath(), "700");
result[1].delete();
}
return result;
}
private File getFileCommand(String clazz) throws Exception {
String classpath = System.getProperty("java.class.path");
File fCommand = new File(workSpace + File.separator + "cache.sh");
fCommand.deleteOnExit();
if (!fCommand.getParentFile().exists()) {
fCommand.getParentFile().mkdirs();
}
fCommand.createNewFile();
OutputStream os = new FileOutputStream(fCommand);
os.write("#!/bin/sh \n".getBytes());
if (clazz == null) {
os.write(("ls ").getBytes());
} else {
os.write(("java -cp " + classpath + " " + clazz).getBytes());
}
os.flush();
os.close();
FileUtil.chmod(fCommand.getAbsolutePath(), "700");
return fCommand;
}
private class CombineOutputCollector<K, V extends Object> implements
OutputCollector<K, V> {
private Writer<K, V> writer;
private Counters.Counter outCounter;
private Progressable progressable;
public CombineOutputCollector(Counters.Counter outCounter,
Progressable progressable) {
this.outCounter = outCounter;
this.progressable = progressable;
}
public synchronized void setWriter(Writer<K, V> writer) {
this.writer = writer;
}
public synchronized void collect(K key, V value) throws IOException {
outCounter.increment(1);
writer.append(key, value);
progressable.progress();
}
}
public static class FakeSplit implements InputSplit {
public void write(DataOutput out) throws IOException {
}
public void readFields(DataInput in) throws IOException {
}
public long getLength() {
return 0L;
}
public String[] getLocations() {
return new String[0];
}
}
private class TestTaskReporter implements Reporter {
private int recordNum = 0; // number of records processed
private String status = null;
private Counters counters = new Counters();
private InputSplit split = new FakeSplit();
@Override
public void progress() {
recordNum++;
}
@Override
public void setStatus(String status) {
this.status = status;
}
public String getStatus() {
return this.status;
}
public Counters.Counter getCounter(String group, String name) {
Counters.Counter counter = null;
if (counters != null) {
counter = counters.findCounter(group, name);
if (counter == null) {
Group grp = counters.addGroup(group, group);
counter = grp.addCounter(name, name, 10);
}
}
return counter;
}
public Counters.Counter getCounter(Enum<?> name) {
return counters == null ? null : counters.findCounter(name);
}
public void incrCounter(Enum<?> key, long amount) {
if (counters != null) {
counters.incrCounter(key, amount);
}
}
public void incrCounter(String group, String counter, long amount) {
if (counters != null) {
counters.incrCounter(group, counter, amount);
}
}
@Override
public InputSplit getInputSplit() throws UnsupportedOperationException {
return split;
}
@Override
public float getProgress() {
return recordNum;
}
}
private class Reader implements RecordReader<FloatWritable, NullWritable> {
private int index = 0;
private FloatWritable progress;
@Override
public boolean next(FloatWritable key, NullWritable value)
throws IOException {
progress = key;
index++;
return index <= 10;
}
@Override
public float getProgress() throws IOException {
return progress.get();
}
@Override
public long getPos() throws IOException {
return index;
}
@Override
public NullWritable createValue() {
return NullWritable.get();
}
@Override
public FloatWritable createKey() {
FloatWritable result = new FloatWritable(index);
return result;
}
@Override
public void close() throws IOException {
}
}
private class ReaderPipesMapRunner implements RecordReader<FloatWritable, NullWritable> {
private int index = 0;
@Override
public boolean next(FloatWritable key, NullWritable value)
throws IOException {
key.set(index++);
return index <= 10;
}
@Override
public float getProgress() throws IOException {
return index;
}
@Override
public long getPos() throws IOException {
return index;
}
@Override
public NullWritable createValue() {
return NullWritable.get();
}
@Override
public FloatWritable createKey() {
FloatWritable result = new FloatWritable(index);
return result;
}
@Override
public void close() throws IOException {
}
}
private class FakeCollector extends
CombineOutputCollector<IntWritable, Text> {
final private Map<IntWritable, Text> collect = new HashMap<IntWritable, Text>();
public FakeCollector(Counter outCounter, Progressable progressable) {
super(outCounter, progressable);
}
@Override
public synchronized void collect(IntWritable key, Text value)
throws IOException {
collect.put(key, value);
super.collect(key, value);
}
public Map<IntWritable, Text> getCollect() {
return collect;
}
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.pipes;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.pipes.TestPipeApplication.FakeSplit;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class TestPipesNonJavaInputFormat {
private static File workSpace = new File("target",
TestPipesNonJavaInputFormat.class.getName() + "-workSpace");
/**
* test PipesNonJavaInputFormat
*/
@Test
public void testFormat() throws IOException {
PipesNonJavaInputFormat inputFormat = new PipesNonJavaInputFormat();
JobConf conf = new JobConf();
Reporter reporter= mock(Reporter.class);
RecordReader<FloatWritable, NullWritable> reader = inputFormat
.getRecordReader(new FakeSplit(), conf, reporter);
assertEquals(0.0f, reader.getProgress(), 0.001);
// input and output files
File input1 = new File(workSpace + File.separator + "input1");
if (!input1.getParentFile().exists()) {
Assert.assertTrue(input1.getParentFile().mkdirs());
}
if (!input1.exists()) {
Assert.assertTrue(input1.createNewFile());
}
File input2 = new File(workSpace + File.separator + "input2");
if (!input2.exists()) {
Assert.assertTrue(input2.createNewFile());
}
// set data for splits
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
input1.getAbsolutePath() + "," + input2.getAbsolutePath());
InputSplit[] splits = inputFormat.getSplits(conf, 2);
assertEquals(2, splits.length);
PipesNonJavaInputFormat.PipesDummyRecordReader dummyRecordReader = new PipesNonJavaInputFormat.PipesDummyRecordReader(
conf, splits[0]);
// empty dummyRecordReader
assertNull(dummyRecordReader.createKey());
assertNull(dummyRecordReader.createValue());
assertEquals(0, dummyRecordReader.getPos());
assertEquals(0.0, dummyRecordReader.getProgress(), 0.001);
// test method next
assertTrue(dummyRecordReader.next(new FloatWritable(2.0f), NullWritable.get()));
assertEquals(2.0, dummyRecordReader.getProgress(), 0.001);
dummyRecordReader.close();
}
}