diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 4853cf26b80..c302e8f63e2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -685,6 +685,9 @@ Release 0.23.7 - UNRELEASED IMPROVEMENTS + MAPREDUCE-4905. test org.apache.hadoop.mapred.pipes + (Aleksey Gorshkov via bobby) + OPTIMIZATIONS MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java index 57370872e30..8f4259e3ec1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java @@ -58,6 +58,7 @@ import org.apache.hadoop.mapred.lib.LazyOutputFormat; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; @@ -515,7 +516,7 @@ public class Submitter extends Configured implements Tool { */ public static void main(String[] args) throws Exception { int exitCode = new Submitter().run(args); - System.exit(exitCode); + ExitUtil.terminate(exitCode); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/CommonStub.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/CommonStub.java new file mode 100644 index 00000000000..f1f11a17be4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/CommonStub.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.pipes; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +import javax.crypto.SecretKey; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; + +public class CommonStub { + + protected Socket socket = null; + protected DataInputStream dataInput; + protected DataOutputStream dataOut; + + protected String createDigest(byte[] password, String data) throws IOException { + SecretKey key = JobTokenSecretManager.createSecretKey(password); + + return SecureShuffleUtils.hashFromString(data, key); + + } + + protected void readObject(Writable obj, DataInputStream inStream) throws IOException { + int numBytes = WritableUtils.readVInt(inStream); + byte[] buffer; + // For BytesWritable and Text, use the specified length to set the length + // this causes the "obvious" translations to work. So that if you emit + // a string "abc" from C++, it shows up as "abc". + if (obj instanceof BytesWritable) { + buffer = new byte[numBytes]; + inStream.readFully(buffer); + ((BytesWritable) obj).set(buffer, 0, numBytes); + } else if (obj instanceof Text) { + buffer = new byte[numBytes]; + inStream.readFully(buffer); + ((Text) obj).set(buffer); + } else { + obj.readFields(inStream); + } + } + + + protected void writeObject(Writable obj, DataOutputStream stream) + throws IOException { + // For Text and BytesWritable, encode them directly, so that they end up + // in C++ as the natural translations. + DataOutputBuffer buffer = new DataOutputBuffer(); + if (obj instanceof Text) { + Text t = (Text) obj; + int len = t.getLength(); + WritableUtils.writeVLong(stream, len); + stream.flush(); + + stream.write(t.getBytes(), 0, len); + stream.flush(); + + } else if (obj instanceof BytesWritable) { + BytesWritable b = (BytesWritable) obj; + int len = b.getLength(); + WritableUtils.writeVLong(stream, len); + stream.write(b.getBytes(), 0, len); + } else { + buffer.reset(); + obj.write(buffer); + int length = buffer.getLength(); + + WritableUtils.writeVInt(stream, length); + stream.write(buffer.getData(), 0, length); + } + stream.flush(); + + } + + protected void initSoket() throws Exception { + int port = Integer.parseInt(System.getenv("mapreduce.pipes.command.port")); + + java.net.InetAddress address = java.net.InetAddress.getLocalHost(); + + socket = new Socket(address.getHostName(), port); + InputStream input = socket.getInputStream(); + OutputStream output = socket.getOutputStream(); + + // try to read + dataInput = new DataInputStream(input); + + WritableUtils.readVInt(dataInput); + + String str = Text.readString(dataInput); + + Text.readString(dataInput); + + dataOut = new DataOutputStream(output); + WritableUtils.writeVInt(dataOut, 57); + String s = createDigest("password".getBytes(), str); + + Text.writeString(dataOut, s); + + // start + WritableUtils.readVInt(dataInput); + int cuttentAnswer = WritableUtils.readVInt(dataInput); + System.out.println("CURRENT_PROTOCOL_VERSION:" + cuttentAnswer); + + // get configuration + // should be MessageType.SET_JOB_CONF.code + WritableUtils.readVInt(dataInput); + + // array length + + int j = WritableUtils.readVInt(dataInput); + for (int i = 0; i < j; i++) { + Text.readString(dataInput); + i++; + Text.readString(dataInput); + } + } + + protected void closeSoket() { + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationRunnableStub.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationRunnableStub.java new file mode 100644 index 00000000000..c2cc794c56d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationRunnableStub.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.pipes; + + +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; + +/* + Stub for TestPipeApplication test. This stub produced test data for main test. Main test checks data + */ + +public class PipeApplicationRunnableStub extends CommonStub { + + public static void main(String[] args) { + PipeApplicationRunnableStub client = new PipeApplicationRunnableStub(); + client.binaryProtocolStub(); + } + + public void binaryProtocolStub() { + try { + + initSoket(); + System.out.println("start OK"); + + // RUN_MAP.code + // should be 3 + + int answer = WritableUtils.readVInt(dataInput); + System.out.println("RunMap:" + answer); + TestPipeApplication.FakeSplit split = new TestPipeApplication.FakeSplit(); + readObject(split, dataInput); + + WritableUtils.readVInt(dataInput); + WritableUtils.readVInt(dataInput); + // end runMap + // get InputTypes + WritableUtils.readVInt(dataInput); + String inText = Text.readString(dataInput); + System.out.println("Key class:" + inText); + inText = Text.readString(dataInput); + System.out.println("Value class:" + inText); + + @SuppressWarnings("unused") + int inCode = 0; + + // read all data from sender and write to output + while ((inCode = WritableUtils.readVInt(dataInput)) == 4) { + FloatWritable key = new FloatWritable(); + NullWritable value = NullWritable.get(); + readObject(key, dataInput); + System.out.println("value:" + key.get()); + readObject(value, dataInput); + } + + WritableUtils.writeVInt(dataOut, 54); + + dataOut.flush(); + dataOut.close(); + + } catch (Exception x) { + x.printStackTrace(); + } finally { + closeSoket(); + } + + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationStub.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationStub.java new file mode 100644 index 00000000000..33e0b813eb3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationStub.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.pipes; + + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +/* +Stub for TestPipeApplication test. This stub produced test data for main test. Main test checks data + */ + +public class PipeApplicationStub extends CommonStub { + + public static void main(String[] args) { + PipeApplicationStub client = new PipeApplicationStub(); + client.binaryProtocolStub(); + } + + public void binaryProtocolStub() { + try { + + initSoket(); + + // output code + WritableUtils.writeVInt(dataOut, 50); + IntWritable wt = new IntWritable(); + wt.set(123); + writeObject(wt, dataOut); + writeObject(new Text("value"), dataOut); + + // PARTITIONED_OUTPUT + WritableUtils.writeVInt(dataOut, 51); + WritableUtils.writeVInt(dataOut, 0); + writeObject(wt, dataOut); + writeObject(new Text("value"), dataOut); + + + // STATUS + WritableUtils.writeVInt(dataOut, 52); + Text.writeString(dataOut, "PROGRESS"); + dataOut.flush(); + + // progress + WritableUtils.writeVInt(dataOut, 53); + dataOut.writeFloat(0.55f); + // register counter + WritableUtils.writeVInt(dataOut, 55); + // id + WritableUtils.writeVInt(dataOut, 0); + Text.writeString(dataOut, "group"); + Text.writeString(dataOut, "name"); + // increment counter + WritableUtils.writeVInt(dataOut, 56); + WritableUtils.writeVInt(dataOut, 0); + + WritableUtils.writeVLong(dataOut, 2); + + // map item + int intValue = WritableUtils.readVInt(dataInput); + System.out.println("intValue:" + intValue); + IntWritable iw = new IntWritable(); + readObject(iw, dataInput); + System.out.println("key:" + iw.get()); + Text txt = new Text(); + readObject(txt, dataInput); + System.out.println("value:" + txt.toString()); + + // done + // end of session + WritableUtils.writeVInt(dataOut, 54); + + System.out.println("finish"); + dataOut.flush(); + dataOut.close(); + + } catch (Exception x) { + x.printStackTrace(); + } finally { + closeSoket(); + } + } + + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeReducerStub.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeReducerStub.java new file mode 100644 index 00000000000..a0afdbc9477 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeReducerStub.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.pipes; + + +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; + +/* +Stub for TestPipeApplication test. This stub produced test data for main test. Main test checks data + */ + +public class PipeReducerStub extends CommonStub { + + public static void main(String[] args) { + PipeReducerStub client = new PipeReducerStub(); + client.binaryProtocolStub(); + } + + public void binaryProtocolStub() { + try { + + initSoket(); + + //should be 5 + //RUN_REDUCE boolean + WritableUtils.readVInt(dataInput); + WritableUtils.readVInt(dataInput); + int intValue = WritableUtils.readVInt(dataInput); + System.out.println("getIsJavaRecordWriter:" + intValue); + + // reduce key + WritableUtils.readVInt(dataInput); + // value of reduce key + BooleanWritable value = new BooleanWritable(); + readObject(value, dataInput); + System.out.println("reducer key :" + value); + // reduce value code: + + // reduce values + while ((intValue = WritableUtils.readVInt(dataInput)) == 7) { + Text txt = new Text(); + // value + readObject(txt, dataInput); + System.out.println("reduce value :" + txt); + } + + + // done + WritableUtils.writeVInt(dataOut, 54); + + dataOut.flush(); + dataOut.close(); + + } catch (Exception x) { + x.printStackTrace(); + } finally { + closeSoket(); + + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java new file mode 100644 index 00000000000..aa345c42c72 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java @@ -0,0 +1,747 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.pipes; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.IFile.Writer; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.Counters.Group; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskLog; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestPipeApplication { + private static File workSpace = new File("target", + TestPipeApplication.class.getName() + "-workSpace"); + + private static String taskName = "attempt_001_02_r03_04_05"; + + /** + * test PipesMapRunner test the transfer data from reader + * + * @throws Exception + */ + @Test + public void testRunner() throws Exception { + + // clean old password files + File[] psw = cleanTokenPasswordFile(); + try { + RecordReader rReader = new ReaderPipesMapRunner(); + JobConf conf = new JobConf(); + conf.set(Submitter.IS_JAVA_RR, "true"); + // for stdour and stderror + + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName); + + CombineOutputCollector output = new CombineOutputCollector( + new Counters.Counter(), new Progress()); + FileSystem fs = new RawLocalFileSystem(); + fs.setConf(conf); + Writer wr = new Writer(conf, fs, + new Path(workSpace + File.separator + "outfile"), IntWritable.class, + Text.class, null, null); + output.setWriter(wr); + // stub for client + File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub"); + + conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath()); + // token for authorization + Token token = new Token( + "user".getBytes(), "password".getBytes(), new Text("kind"), new Text( + "service")); + conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token); + conf.setBoolean(MRJobConfig.SKIP_RECORDS, true); + TestTaskReporter reporter = new TestTaskReporter(); + PipesMapRunner runner = new PipesMapRunner(); + + initStdOut(conf); + + runner.configure(conf); + runner.run(rReader, output, reporter); + + String stdOut = readStdOut(conf); + + // test part of translated data. As common file for client and test - + // clients stdOut + // check version + assertTrue(stdOut.contains("CURRENT_PROTOCOL_VERSION:0")); + // check key and value classes + assertTrue(stdOut + .contains("Key class:org.apache.hadoop.io.FloatWritable")); + assertTrue(stdOut + .contains("Value class:org.apache.hadoop.io.NullWritable")); + // test have sent all data from reader + assertTrue(stdOut.contains("value:0.0")); + assertTrue(stdOut.contains("value:9.0")); + + } finally { + if (psw != null) { + // remove password files + for (File file : psw) { + file.deleteOnExit(); + } + } + + } + } + + /** + * test org.apache.hadoop.mapred.pipes.Application + * test a internal functions: MessageType.REGISTER_COUNTER, INCREMENT_COUNTER, STATUS, PROGRESS... + * + * @throws Throwable + */ + + @Test + public void testApplication() throws Throwable { + JobConf conf = new JobConf(); + + RecordReader rReader = new Reader(); + + // client for test + File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationStub"); + + TestTaskReporter reporter = new TestTaskReporter(); + + File[] psw = cleanTokenPasswordFile(); + try { + + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName); + conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath()); + + // token for authorization + Token token = new Token( + "user".getBytes(), "password".getBytes(), new Text("kind"), new Text( + "service")); + + conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token); + FakeCollector output = new FakeCollector(new Counters.Counter(), + new Progress()); + FileSystem fs = new RawLocalFileSystem(); + fs.setConf(conf); + Writer wr = new Writer(conf, fs, + new Path(workSpace.getAbsolutePath() + File.separator + "outfile"), + IntWritable.class, Text.class, null, null); + output.setWriter(wr); + conf.set(Submitter.PRESERVE_COMMANDFILE, "true"); + + Application, Writable, IntWritable, Text> application = new Application, Writable, IntWritable, Text>( + conf, rReader, output, reporter, IntWritable.class, Text.class); + application.getDownlink().flush(); + + application.getDownlink().mapItem(new IntWritable(3), new Text("txt")); + + application.getDownlink().flush(); + + application.waitForFinish(); + + wr.close(); + + // test getDownlink().mapItem(); + String stdOut = readStdOut(conf); + assertTrue(stdOut.contains("key:3")); + assertTrue(stdOut.contains("value:txt")); + + // reporter test counter, and status should be sended + // test MessageType.REGISTER_COUNTER and INCREMENT_COUNTER + assertEquals(1.0, reporter.getProgress(), 0.01); + assertNotNull(reporter.getCounter("group", "name")); + // test status MessageType.STATUS + assertEquals(reporter.getStatus(), "PROGRESS"); + stdOut = readFile(new File(workSpace.getAbsolutePath() + File.separator + + "outfile")); + // check MessageType.PROGRESS + assertEquals(0.55f, rReader.getProgress(), 0.001); + application.getDownlink().close(); + // test MessageType.OUTPUT + Entry entry = output.getCollect().entrySet() + .iterator().next(); + assertEquals(123, entry.getKey().get()); + assertEquals("value", entry.getValue().toString()); + try { + // try to abort + application.abort(new Throwable()); + fail(); + } catch (IOException e) { + // abort works ? + assertEquals("pipe child exception", e.getMessage()); + } + } finally { + if (psw != null) { + // remove password files + for (File file : psw) { + file.deleteOnExit(); + } + } + } + } + + /** + * test org.apache.hadoop.mapred.pipes.Submitter + * + * @throws Exception + */ + @Test + public void testSubmitter() throws Exception { + + JobConf conf = new JobConf(); + + File[] psw = cleanTokenPasswordFile(); + + System.setProperty("test.build.data", + "target/tmp/build/TEST_SUBMITTER_MAPPER/data"); + conf.set("hadoop.log.dir", "target/tmp"); + + // prepare configuration + Submitter.setIsJavaMapper(conf, false); + Submitter.setIsJavaReducer(conf, false); + Submitter.setKeepCommandFile(conf, false); + Submitter.setIsJavaRecordReader(conf, false); + Submitter.setIsJavaRecordWriter(conf, false); + PipesPartitioner partitioner = new PipesPartitioner(); + partitioner.configure(conf); + + Submitter.setJavaPartitioner(conf, partitioner.getClass()); + + assertEquals(PipesPartitioner.class, (Submitter.getJavaPartitioner(conf))); + // test going to call main method with System.exit(). Change Security + SecurityManager securityManager = System.getSecurityManager(); + // store System.out + PrintStream oldps = System.out; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ExitUtil.disableSystemExit(); + // test without parameters + try { + System.setOut(new PrintStream(out)); + Submitter.main(new String[0]); + fail(); + } catch (ExitUtil.ExitException e) { + // System.exit prohibited! output message test + assertTrue(out.toString().contains("")); + assertTrue(out.toString().contains("bin/hadoop pipes")); + assertTrue(out.toString().contains("[-input ] // Input directory")); + assertTrue(out.toString() + .contains("[-output ] // Output directory")); + assertTrue(out.toString().contains("[-jar // jar filename")); + assertTrue(out.toString().contains( + "[-inputformat ] // InputFormat class")); + assertTrue(out.toString().contains("[-map ] // Java Map class")); + assertTrue(out.toString().contains( + "[-partitioner ] // Java Partitioner")); + assertTrue(out.toString().contains( + "[-reduce ] // Java Reduce class")); + assertTrue(out.toString().contains( + "[-writer ] // Java RecordWriter")); + assertTrue(out.toString().contains( + "[-program ] // executable URI")); + assertTrue(out.toString().contains( + "[-reduces ] // number of reduces")); + assertTrue(out.toString().contains( + "[-lazyOutput ] // createOutputLazily")); + + assertTrue(out + .toString() + .contains( + "-conf specify an application configuration file")); + assertTrue(out.toString().contains( + "-D use value for given property")); + assertTrue(out.toString().contains( + "-fs specify a namenode")); + assertTrue(out.toString().contains( + "-jt specify a job tracker")); + assertTrue(out + .toString() + .contains( + "-files specify comma separated files to be copied to the map reduce cluster")); + assertTrue(out + .toString() + .contains( + "-libjars specify comma separated jar files to include in the classpath.")); + assertTrue(out + .toString() + .contains( + "-archives specify comma separated archives to be unarchived on the compute machines.")); + } finally { + System.setOut(oldps); + // restore + System.setSecurityManager(securityManager); + if (psw != null) { + // remove password files + for (File file : psw) { + file.deleteOnExit(); + } + } + } + // test call Submitter form command line + try { + File fCommand = getFileCommand(null); + String[] args = new String[22]; + File input = new File(workSpace + File.separator + "input"); + if (!input.exists()) { + Assert.assertTrue(input.createNewFile()); + } + File outPut = new File(workSpace + File.separator + "output"); + FileUtil.fullyDelete(outPut); + + args[0] = "-input"; + args[1] = input.getAbsolutePath();// "input"; + args[2] = "-output"; + args[3] = outPut.getAbsolutePath();// "output"; + args[4] = "-inputformat"; + args[5] = "org.apache.hadoop.mapred.TextInputFormat"; + args[6] = "-map"; + args[7] = "org.apache.hadoop.mapred.lib.IdentityMapper"; + args[8] = "-partitioner"; + args[9] = "org.apache.hadoop.mapred.pipes.PipesPartitioner"; + args[10] = "-reduce"; + args[11] = "org.apache.hadoop.mapred.lib.IdentityReducer"; + args[12] = "-writer"; + args[13] = "org.apache.hadoop.mapred.TextOutputFormat"; + args[14] = "-program"; + args[15] = fCommand.getAbsolutePath();// "program"; + args[16] = "-reduces"; + args[17] = "2"; + args[18] = "-lazyOutput"; + args[19] = "lazyOutput"; + args[20] = "-jobconf"; + args[21] = "mapreduce.pipes.isjavarecordwriter=false,mapreduce.pipes.isjavarecordreader=false"; + + Submitter.main(args); + fail(); + } catch (ExitUtil.ExitException e) { + // status should be 0 + assertEquals(e.status, 0); + + } finally { + System.setOut(oldps); + System.setSecurityManager(securityManager); + } + + } + + /** + * test org.apache.hadoop.mapred.pipes.PipesReducer + * test the transfer of data: key and value + * + * @throws Exception + */ + @Test + public void testPipesReduser() throws Exception { + + File[] psw = cleanTokenPasswordFile(); + JobConf conf = new JobConf(); + try { + Token token = new Token( + "user".getBytes(), "password".getBytes(), new Text("kind"), new Text( + "service")); + conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token); + + File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeReducerStub"); + conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath()); + + PipesReducer reducer = new PipesReducer(); + reducer.configure(conf); + BooleanWritable bw = new BooleanWritable(true); + + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName); + initStdOut(conf); + conf.setBoolean(MRJobConfig.SKIP_RECORDS, true); + CombineOutputCollector output = new CombineOutputCollector( + new Counters.Counter(), new Progress()); + Reporter reporter = new TestTaskReporter(); + List texts = new ArrayList(); + texts.add(new Text("first")); + texts.add(new Text("second")); + texts.add(new Text("third")); + + reducer.reduce(bw, texts.iterator(), output, reporter); + reducer.close(); + String stdOut = readStdOut(conf); + // test data: key + assertTrue(stdOut.contains("reducer key :true")); + // and values + assertTrue(stdOut.contains("reduce value :first")); + assertTrue(stdOut.contains("reduce value :second")); + assertTrue(stdOut.contains("reduce value :third")); + + } finally { + if (psw != null) { + // remove password files + for (File file : psw) { + file.deleteOnExit(); + } + } + } + + } + + /** + * test PipesPartitioner + * test set and get data from PipesPartitioner + */ + @Test + public void testPipesPartitioner() { + + PipesPartitioner partitioner = new PipesPartitioner(); + JobConf configuration = new JobConf(); + Submitter.getJavaPartitioner(configuration); + partitioner.configure(new JobConf()); + IntWritable iw = new IntWritable(4); + // the cache empty + assertEquals(0, partitioner.getPartition(iw, new Text("test"), 2)); + // set data into cache + PipesPartitioner.setNextPartition(3); + // get data from cache + assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2)); + } + + /** + * clean previous std error and outs + */ + + private void initStdOut(JobConf configuration) { + TaskAttemptID taskId = TaskAttemptID.forName(configuration + .get(MRJobConfig.TASK_ATTEMPT_ID)); + File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT); + File stdErr = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDERR); + // prepare folder + if (!stdOut.getParentFile().exists()) { + stdOut.getParentFile().mkdirs(); + } else { // clean logs + stdOut.deleteOnExit(); + stdErr.deleteOnExit(); + } + } + + private String readStdOut(JobConf conf) throws Exception { + TaskAttemptID taskId = TaskAttemptID.forName(conf + .get(MRJobConfig.TASK_ATTEMPT_ID)); + File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT); + + return readFile(stdOut); + + } + + private String readFile(File file) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + InputStream is = new FileInputStream(file); + byte[] buffer = new byte[1024]; + int counter = 0; + while ((counter = is.read(buffer)) >= 0) { + out.write(buffer, 0, counter); + } + + is.close(); + + return out.toString(); + + } + + private class Progress implements Progressable { + + @Override + public void progress() { + + } + + } + + private File[] cleanTokenPasswordFile() throws Exception { + File[] result = new File[2]; + result[0] = new File("./jobTokenPassword"); + if (result[0].exists()) { + FileUtil.chmod(result[0].getAbsolutePath(), "700"); + assertTrue(result[0].delete()); + } + result[1] = new File("./.jobTokenPassword.crc"); + if (result[1].exists()) { + FileUtil.chmod(result[1].getAbsolutePath(), "700"); + result[1].delete(); + } + return result; + } + + private File getFileCommand(String clazz) throws Exception { + String classpath = System.getProperty("java.class.path"); + File fCommand = new File(workSpace + File.separator + "cache.sh"); + fCommand.deleteOnExit(); + if (!fCommand.getParentFile().exists()) { + fCommand.getParentFile().mkdirs(); + } + fCommand.createNewFile(); + OutputStream os = new FileOutputStream(fCommand); + os.write("#!/bin/sh \n".getBytes()); + if (clazz == null) { + os.write(("ls ").getBytes()); + } else { + os.write(("java -cp " + classpath + " " + clazz).getBytes()); + } + os.flush(); + os.close(); + FileUtil.chmod(fCommand.getAbsolutePath(), "700"); + return fCommand; + } + + private class CombineOutputCollector implements + OutputCollector { + private Writer writer; + private Counters.Counter outCounter; + private Progressable progressable; + + public CombineOutputCollector(Counters.Counter outCounter, + Progressable progressable) { + this.outCounter = outCounter; + this.progressable = progressable; + } + + public synchronized void setWriter(Writer writer) { + this.writer = writer; + } + + public synchronized void collect(K key, V value) throws IOException { + outCounter.increment(1); + writer.append(key, value); + progressable.progress(); + } + } + + public static class FakeSplit implements InputSplit { + public void write(DataOutput out) throws IOException { + } + + public void readFields(DataInput in) throws IOException { + } + + public long getLength() { + return 0L; + } + + public String[] getLocations() { + return new String[0]; + } + } + + private class TestTaskReporter implements Reporter { + private int recordNum = 0; // number of records processed + private String status = null; + private Counters counters = new Counters(); + private InputSplit split = new FakeSplit(); + + @Override + public void progress() { + + recordNum++; + } + + @Override + public void setStatus(String status) { + this.status = status; + + } + + public String getStatus() { + return this.status; + + } + + public Counters.Counter getCounter(String group, String name) { + Counters.Counter counter = null; + if (counters != null) { + counter = counters.findCounter(group, name); + if (counter == null) { + Group grp = counters.addGroup(group, group); + counter = grp.addCounter(name, name, 10); + } + } + return counter; + } + + public Counters.Counter getCounter(Enum name) { + return counters == null ? null : counters.findCounter(name); + } + + public void incrCounter(Enum key, long amount) { + if (counters != null) { + counters.incrCounter(key, amount); + } + } + + public void incrCounter(String group, String counter, long amount) { + + if (counters != null) { + counters.incrCounter(group, counter, amount); + } + + } + + @Override + public InputSplit getInputSplit() throws UnsupportedOperationException { + return split; + } + + @Override + public float getProgress() { + return recordNum; + } + + } + + private class Reader implements RecordReader { + private int index = 0; + private FloatWritable progress; + + @Override + public boolean next(FloatWritable key, NullWritable value) + throws IOException { + progress = key; + index++; + return index <= 10; + } + + @Override + public float getProgress() throws IOException { + return progress.get(); + } + + @Override + public long getPos() throws IOException { + + return index; + } + + @Override + public NullWritable createValue() { + + return NullWritable.get(); + } + + @Override + public FloatWritable createKey() { + FloatWritable result = new FloatWritable(index); + return result; + } + + @Override + public void close() throws IOException { + + } + } + + + private class ReaderPipesMapRunner implements RecordReader { + private int index = 0; + + @Override + public boolean next(FloatWritable key, NullWritable value) + throws IOException { + key.set(index++); + return index <= 10; + } + + @Override + public float getProgress() throws IOException { + return index; + } + + @Override + public long getPos() throws IOException { + + return index; + } + + @Override + public NullWritable createValue() { + + return NullWritable.get(); + } + + @Override + public FloatWritable createKey() { + FloatWritable result = new FloatWritable(index); + return result; + } + + @Override + public void close() throws IOException { + + } + } + + private class FakeCollector extends + CombineOutputCollector { + + final private Map collect = new HashMap(); + + public FakeCollector(Counter outCounter, Progressable progressable) { + super(outCounter, progressable); + } + + @Override + public synchronized void collect(IntWritable key, Text value) + throws IOException { + collect.put(key, value); + super.collect(key, value); + } + + public Map getCollect() { + return collect; + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipesNonJavaInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipesNonJavaInputFormat.java new file mode 100644 index 00000000000..b3277854d18 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipesNonJavaInputFormat.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.pipes; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.pipes.TestPipeApplication.FakeSplit; +import org.junit.Assert; +import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +public class TestPipesNonJavaInputFormat { + private static File workSpace = new File("target", + TestPipesNonJavaInputFormat.class.getName() + "-workSpace"); + + /** + * test PipesNonJavaInputFormat + */ + + @Test + public void testFormat() throws IOException { + + PipesNonJavaInputFormat inputFormat = new PipesNonJavaInputFormat(); + JobConf conf = new JobConf(); + + Reporter reporter= mock(Reporter.class); + RecordReader reader = inputFormat + .getRecordReader(new FakeSplit(), conf, reporter); + assertEquals(0.0f, reader.getProgress(), 0.001); + + // input and output files + File input1 = new File(workSpace + File.separator + "input1"); + if (!input1.getParentFile().exists()) { + Assert.assertTrue(input1.getParentFile().mkdirs()); + } + + if (!input1.exists()) { + Assert.assertTrue(input1.createNewFile()); + } + + File input2 = new File(workSpace + File.separator + "input2"); + if (!input2.exists()) { + Assert.assertTrue(input2.createNewFile()); + } + // set data for splits + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + input1.getAbsolutePath() + "," + input2.getAbsolutePath()); + InputSplit[] splits = inputFormat.getSplits(conf, 2); + assertEquals(2, splits.length); + + PipesNonJavaInputFormat.PipesDummyRecordReader dummyRecordReader = new PipesNonJavaInputFormat.PipesDummyRecordReader( + conf, splits[0]); + // empty dummyRecordReader + assertNull(dummyRecordReader.createKey()); + assertNull(dummyRecordReader.createValue()); + assertEquals(0, dummyRecordReader.getPos()); + assertEquals(0.0, dummyRecordReader.getProgress(), 0.001); + // test method next + assertTrue(dummyRecordReader.next(new FloatWritable(2.0f), NullWritable.get())); + assertEquals(2.0, dummyRecordReader.getProgress(), 0.001); + dummyRecordReader.close(); + } + +}