diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index c5ce17b3882..4b75d6ad8bd 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -175,6 +175,9 @@ Trunk (unreleased changes) HADOOP-8584. test-patch.sh should not immediately exit when no tests are added or modified. (Colin Patrick McCabe via eli) + HADOOP-8521. Port StreamInputFormat to new Map Reduce API (madhukara + phatak via bobby) + OPTIMIZATIONS HADOOP-7761. Improve the performance of raw comparisons. (todd) diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java index dbfa75a855f..85e5ab3e7bd 100644 --- a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java @@ -126,7 +126,7 @@ public abstract class StreamBaseRecordReader implements RecordReader } String unqualSplit = split_.getPath().getName() + ":" + split_.getStart() + "+" + split_.getLength(); - String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " " + unqualSplit + String status = "HSTR " + StreamUtil.getHost() + " " + numRec_ + ". pos=" + pos + " " + unqualSplit + " Processing record=" + recStr; status += " " + splitName_; return status; diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamUtil.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamUtil.java index 7e8d0a2ac44..8dd987e870c 100644 --- a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamUtil.java +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamUtil.java @@ -168,12 +168,17 @@ public class StreamUtil { } static private Environment env; - static String HOST; + private static String host; + public static String getHost(){ + return host; + } + + static { try { env = new Environment(); - HOST = env.getHost(); + host = env.getHost(); } catch (IOException io) { io.printStackTrace(); } diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java index b5327dacf42..7438cb8191a 100644 --- a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java @@ -64,7 +64,7 @@ public class StreamXmlRecordReader extends StreamBaseRecordReader { init(); } - public void init() throws IOException { + public final void init() throws IOException { LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_=" + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > " + in_.getPos()); @@ -185,14 +185,14 @@ public class StreamXmlRecordReader extends StreamBaseRecordReader { } // states - final static int CDATA_IN = 10; - final static int CDATA_OUT = 11; - final static int CDATA_UNK = 12; - final static int RECORD_ACCEPT = 13; + private final static int CDATA_IN = 10; + private final static int CDATA_OUT = 11; + private final static int CDATA_UNK = 12; + private final static int RECORD_ACCEPT = 13; // inputs - final static int CDATA_BEGIN = 20; - final static int CDATA_END = 21; - final static int RECORD_MAYBE = 22; + private final static int CDATA_BEGIN = 20; + private final static int CDATA_END = 21; + private final static int RECORD_MAYBE = 22; /* also updates firstMatchStart_;*/ int nextState(int state, int input, int bufPos) { @@ -293,7 +293,7 @@ public class StreamXmlRecordReader extends StreamBaseRecordReader { BufferedInputStream bin_; // Wrap FSDataInputStream for efficient backward seeks long pos_; // Keep track on position with respect encapsulated FSDataInputStream - final static int NA = -1; + private final static int NA = -1; int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA. int firstMatchEnd_ = 0; diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamBaseRecordReader.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamBaseRecordReader.java new file mode 100644 index 00000000000..d71c20d23a1 --- /dev/null +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamBaseRecordReader.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.streaming.StreamUtil; + +/** + * Shared functionality for hadoopStreaming formats. A custom reader can be + * defined to be a RecordReader with the constructor below and is selected with + * the option bin/hadoopStreaming -inputreader ... + * + * @see StreamXmlRecordReader + */ +public abstract class StreamBaseRecordReader extends RecordReader { + + protected static final Log LOG = LogFactory + .getLog(StreamBaseRecordReader.class.getName()); + + // custom JobConf properties for this class are prefixed with this namespace + final static String CONF_NS = "stream.recordreader."; + + public StreamBaseRecordReader(FSDataInputStream in, FileSplit split, + TaskAttemptContext context, Configuration conf, FileSystem fs) + throws IOException { + in_ = in; + split_ = split; + start_ = split_.getStart(); + length_ = split_.getLength(); + end_ = start_ + length_; + splitName_ = split_.getPath().getName(); + this.context_ = context; + conf_ = conf; + fs_ = fs; + + statusMaxRecordChars_ = conf.getInt(CONF_NS + "statuschars", 200); + } + + // / RecordReader API + + /** + * Read a record. Implementation should call numRecStats at the end + */ + public abstract boolean next(Text key, Text value) throws IOException; + + /** Returns the current position in the input. */ + public synchronized long getPos() throws IOException { + return in_.getPos(); + } + + /** Close this to future operations. */ + public synchronized void close() throws IOException { + in_.close(); + } + + public float getProgress() throws IOException { + if (end_ == start_) { + return 1.0f; + } else { + return ((float) (in_.getPos() - start_)) / ((float) (end_ - start_)); + } + } + + public Text createKey() { + return new Text(); + } + + public Text createValue() { + return new Text(); + } + + // / StreamBaseRecordReader API + + /** + * Implementation should seek forward in_ to the first byte of the next + * record. The initial byte offset in the stream is arbitrary. + */ + public abstract void seekNextRecordBoundary() throws IOException; + + void numRecStats(byte[] record, int start, int len) throws IOException { + numRec_++; + if (numRec_ == nextStatusRec_) { + String recordStr = new String(record, start, Math.min(len, + statusMaxRecordChars_), "UTF-8"); + nextStatusRec_ += 100;// *= 10; + String status = getStatus(recordStr); + LOG.info(status); + context_.setStatus(status); + } + } + + long lastMem = 0; + + String getStatus(CharSequence record) { + long pos = -1; + try { + pos = getPos(); + } catch (IOException io) { + } + String recStr; + if (record.length() > statusMaxRecordChars_) { + recStr = record.subSequence(0, statusMaxRecordChars_) + "..."; + } else { + recStr = record.toString(); + } + String unqualSplit = split_.getPath().getName() + ":" + split_.getStart() + + "+" + split_.getLength(); + String status = "HSTR " + StreamUtil.getHost() + " " + numRec_ + ". pos=" + + pos + " " + unqualSplit + " Processing record=" + recStr; + status += " " + splitName_; + return status; + } + + FSDataInputStream in_; + FileSplit split_; + long start_; + long end_; + long length_; + String splitName_; + TaskAttemptContext context_; + Configuration conf_; + FileSystem fs_; + int numRec_ = 0; + int nextStatusRec_ = 1; + int statusMaxRecordChars_; + +} diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java new file mode 100644 index 00000000000..a77c13762ca --- /dev/null +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Constructor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; +import org.apache.hadoop.streaming.StreamUtil; + +/** + * An input format that selects a RecordReader based on a JobConf property. This + * should be used only for non-standard record reader such as + * StreamXmlRecordReader. For all other standard record readers, the appropriate + * input format classes should be used. + */ +public class StreamInputFormat extends KeyValueTextInputFormat { + + @Override + public RecordReader createRecordReader(InputSplit genericSplit, + TaskAttemptContext context) throws IOException { + + Configuration conf = context.getConfiguration(); + + String c = conf.get("stream.recordreader.class"); + if (c == null || c.indexOf("LineRecordReader") >= 0) { + return super.createRecordReader(genericSplit, context); + } + + // handling non-standard record reader (likely StreamXmlRecordReader) + FileSplit split = (FileSplit) genericSplit; + // LOG.info("getRecordReader start.....split=" + split); + context.setStatus(split.toString()); + context.progress(); + + // Open the file and seek to the start of the split + FileSystem fs = split.getPath().getFileSystem(conf); + FSDataInputStream in = fs.open(split.getPath()); + + // Factory dispatch based on available params.. + Class readerClass; + + { + readerClass = StreamUtil.goodClassOrNull(conf, c, null); + if (readerClass == null) { + throw new RuntimeException("Class not found: " + c); + } + } + Constructor ctor; + + try { + ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class, + FileSplit.class, TaskAttemptContext.class, Configuration.class, + FileSystem.class }); + } catch (NoSuchMethodException nsm) { + throw new RuntimeException(nsm); + } + + RecordReader reader; + try { + reader = (RecordReader) ctor.newInstance(new Object[] { in, + split, context, conf, fs }); + } catch (Exception nsm) { + throw new RuntimeException(nsm); + } + return reader; + + } + +} diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamXmlRecordReader.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamXmlRecordReader.java new file mode 100644 index 00000000000..c7ee847763f --- /dev/null +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamXmlRecordReader.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming.mapreduce; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.streaming.StreamUtil; + +/** + * A way to interpret XML fragments as Mapper input records. Values are XML + * subtrees delimited by configurable tags. Keys could be the value of a certain + * attribute in the XML subtree, but this is left to the stream processor + * application. + * + * The name-value properties that StreamXmlRecordReader understands are: String + * begin (chars marking beginning of record) String end (chars marking end of + * record) int maxrec (maximum record size) int lookahead(maximum lookahead to + * sync CDATA) boolean slowmatch + */ +public class StreamXmlRecordReader extends StreamBaseRecordReader { + + private Text key; + private Text value; + + public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, + TaskAttemptContext context, Configuration conf, FileSystem fs) + throws IOException { + super(in, split, context, conf, fs); + + beginMark_ = checkJobGet(CONF_NS + "begin"); + endMark_ = checkJobGet(CONF_NS + "end"); + + maxRecSize_ = conf_.getInt(CONF_NS + "maxrec", 50 * 1000); + lookAhead_ = conf_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_); + synched_ = false; + + slowMatch_ = conf_.getBoolean(CONF_NS + "slowmatch", false); + if (slowMatch_) { + beginPat_ = makePatternCDataOrMark(beginMark_); + endPat_ = makePatternCDataOrMark(endMark_); + } + init(); + } + + public final void init() throws IOException { + LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + + end_ + " length_=" + length_ + " start_ > in_.getPos() =" + + (start_ > in_.getPos()) + " " + start_ + " > " + in_.getPos()); + if (start_ > in_.getPos()) { + in_.seek(start_); + } + pos_ = start_; + bin_ = new BufferedInputStream(in_); + seekNextRecordBoundary(); + } + + int numNext = 0; + + public synchronized boolean next(Text key, Text value) throws IOException { + numNext++; + if (pos_ >= end_) { + return false; + } + + DataOutputBuffer buf = new DataOutputBuffer(); + if (!readUntilMatchBegin()) { + return false; + } + if (pos_ >= end_ || !readUntilMatchEnd(buf)) { + return false; + } + + // There is only one elem..key/value splitting is not done here. + byte[] record = new byte[buf.getLength()]; + System.arraycopy(buf.getData(), 0, record, 0, record.length); + + numRecStats(record, 0, record.length); + + key.set(record); + value.set(""); + + return true; + } + + public void seekNextRecordBoundary() throws IOException { + readUntilMatchBegin(); + } + + boolean readUntilMatchBegin() throws IOException { + if (slowMatch_) { + return slowReadUntilMatch(beginPat_, false, null); + } else { + return fastReadUntilMatch(beginMark_, false, null); + } + } + + private boolean readUntilMatchEnd(DataOutputBuffer buf) throws IOException { + if (slowMatch_) { + return slowReadUntilMatch(endPat_, true, buf); + } else { + return fastReadUntilMatch(endMark_, true, buf); + } + } + + private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat, + DataOutputBuffer outBufOrNull) throws IOException { + byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)]; + int read = 0; + bin_.mark(Math.max(lookAhead_, maxRecSize_) + 2); // mark to invalidate if + // we read more + read = bin_.read(buf); + if (read == -1) + return false; + + String sbuf = new String(buf, 0, read, "UTF-8"); + Matcher match = markPattern.matcher(sbuf); + + firstMatchStart_ = NA; + firstMatchEnd_ = NA; + int bufPos = 0; + int state = synched_ ? CDATA_OUT : CDATA_UNK; + int s = 0; + + while (match.find(bufPos)) { + int input; + if (match.group(1) != null) { + input = CDATA_BEGIN; + } else if (match.group(2) != null) { + input = CDATA_END; + firstMatchStart_ = NA; // | ]]> should keep it + } else { + input = RECORD_MAYBE; + } + if (input == RECORD_MAYBE) { + if (firstMatchStart_ == NA) { + firstMatchStart_ = match.start(); + firstMatchEnd_ = match.end(); + } + } + state = nextState(state, input, match.start()); + if (state == RECORD_ACCEPT) { + break; + } + bufPos = match.end(); + s++; + } + if (state != CDATA_UNK) { + synched_ = true; + } + boolean matched = (firstMatchStart_ != NA) + && (state == RECORD_ACCEPT || state == CDATA_UNK); + if (matched) { + int endPos = includePat ? firstMatchEnd_ : firstMatchStart_; + bin_.reset(); + + for (long skiplen = endPos; skiplen > 0;) { + skiplen -= bin_.skip(skiplen); // Skip succeeds as we have read this + // buffer + } + + pos_ += endPos; + if (outBufOrNull != null) { + outBufOrNull.writeBytes(sbuf.substring(0, endPos)); + } + } + return matched; + } + + // states + private final static int CDATA_IN = 10; + private final static int CDATA_OUT = 11; + private final static int CDATA_UNK = 12; + private final static int RECORD_ACCEPT = 13; + // inputs + private final static int CDATA_BEGIN = 20; + private final static int CDATA_END = 21; + private final static int RECORD_MAYBE = 22; + + /* also updates firstMatchStart_; */ + int nextState(int state, int input, int bufPos) { + switch (state) { + case CDATA_UNK: + case CDATA_OUT: + switch (input) { + case CDATA_BEGIN: + return CDATA_IN; + case CDATA_END: + if (state == CDATA_OUT) { + // System.out.println("buggy XML " + bufPos); + } + return CDATA_OUT; + case RECORD_MAYBE: + return (state == CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT; + } + break; + case CDATA_IN: + return (input == CDATA_END) ? CDATA_OUT : CDATA_IN; + } + throw new IllegalStateException(state + " " + input + " " + bufPos + " " + + splitName_); + } + + Pattern makePatternCDataOrMark(String escapedMark) { + StringBuffer pat = new StringBuffer(); + addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN + addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END + addGroup(pat, escapedMark); // RECORD_MAYBE + return Pattern.compile(pat.toString()); + } + + void addGroup(StringBuffer pat, String escapedGroup) { + if (pat.length() > 0) { + pat.append("|"); + } + pat.append("("); + pat.append(escapedGroup); + pat.append(")"); + } + + boolean fastReadUntilMatch(String textPat, boolean includePat, + DataOutputBuffer outBufOrNull) throws IOException { + byte[] cpat = textPat.getBytes("UTF-8"); + int m = 0; + boolean match = false; + int msup = cpat.length; + int LL = 120000 * 10; + + bin_.mark(LL); // large number to invalidate mark + while (true) { + int b = bin_.read(); + if (b == -1) + break; + + byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8 + if (c == cpat[m]) { + m++; + if (m == msup) { + match = true; + break; + } + } else { + bin_.mark(LL); // rest mark so we could jump back if we found a match + if (outBufOrNull != null) { + outBufOrNull.write(cpat, 0, m); + outBufOrNull.write(c); + } + pos_ += m + 1; // skip m chars, +1 for 'c' + m = 0; + } + } + if (!includePat && match) { + bin_.reset(); + } else if (outBufOrNull != null) { + outBufOrNull.write(cpat); + pos_ += msup; + } + return match; + } + + String checkJobGet(String prop) throws IOException { + String val = conf_.get(prop); + if (val == null) { + throw new IOException("JobConf: missing required property: " + prop); + } + return val; + } + + String beginMark_; + String endMark_; + + Pattern beginPat_; + Pattern endPat_; + + boolean slowMatch_; + int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should be + // more than max record size + int maxRecSize_; + + BufferedInputStream bin_; // Wrap FSDataInputStream for efficient backward + // seeks + long pos_; // Keep track on position with respect encapsulated + // FSDataInputStream + + private final static int NA = -1; + int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA. + int firstMatchEnd_ = 0; + + boolean synched_; + + @Override + public Text getCurrentKey() throws IOException, InterruptedException { + return key; + } + + @Override + public Text getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public void initialize(InputSplit arg0, TaskAttemptContext arg1) + throws IOException, InterruptedException { + + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + key = createKey(); + value = createValue(); + return next(key, value); + } + +} diff --git a/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/mapreduce/TestStreamXmlRecordReader.java b/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/mapreduce/TestStreamXmlRecordReader.java new file mode 100644 index 00000000000..50f38bd20c2 --- /dev/null +++ b/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/mapreduce/TestStreamXmlRecordReader.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * This class tests StreamXmlRecordReader The test creates an XML file, uses + * StreamXmlRecordReader and compares the expected output against the generated + * output + */ +public class TestStreamXmlRecordReader { + + private File INPUT_FILE; + private String input; + private String outputExpect; + Path OUTPUT_DIR; + FileSystem fs; + + public TestStreamXmlRecordReader() throws IOException { + INPUT_FILE = new File("target/input.xml"); + input = "\t\nroses.are.red\t\nviolets.are.blue\t\n" + + "bunnies.are.pink\t\n\t\n"; + outputExpect = input; + } + + protected void assertOutput(String expectedOutput, String output) + throws IOException { + String[] words = expectedOutput.split("\t\n"); + Set expectedWords = new HashSet(Arrays.asList(words)); + words = output.split("\t\n"); + Set returnedWords = new HashSet(Arrays.asList(words)); + assertTrue(returnedWords.containsAll(expectedWords)); + } + + protected void checkOutput() throws IOException { + File outFile = new File(OUTPUT_DIR.toString()); + Path outPath = new Path(outFile.getAbsolutePath(), "part-r-00000"); + String output = slurpHadoop(outPath, fs); + fs.delete(outPath, true); + outputExpect = "\n" + outputExpect + ""; + System.err.println("outEx1=" + outputExpect); + System.err.println(" out1=" + output); + assertOutput(outputExpect, output); + } + + private String slurpHadoop(Path p, FileSystem fs) throws IOException { + int len = (int) fs.getFileStatus(p).getLen(); + byte[] buf = new byte[len]; + FSDataInputStream in = fs.open(p); + String contents = null; + try { + in.readFully(in.getPos(), buf); + contents = new String(buf, "UTF-8"); + } finally { + in.close(); + } + return contents; + } + + @Before + public void createInput() throws IOException { + FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile()); + String dummyXmlStartTag = "\n"; + String dummyXmlEndTag = "\n"; + out.write(dummyXmlStartTag.getBytes("UTF-8")); + out.write(input.getBytes("UTF-8")); + out.write(dummyXmlEndTag.getBytes("UTF-8")); + out.close(); + } + + @Test + public void testStreamXmlRecordReader() throws Exception { + + Job job = new Job(); + Configuration conf = job.getConfiguration(); + job.setJarByClass(TestStreamXmlRecordReader.class); + job.setMapperClass(Mapper.class); + conf.set("stream.recordreader.class", + "org.apache.hadoop.streaming.mapreduce.StreamXmlRecordReader"); + conf.set("stream.recordreader.begin", ""); + conf.set("stream.recordreader.end", ""); + job.setInputFormatClass(StreamInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + FileInputFormat.addInputPath(job, new Path("target/input.xml")); + OUTPUT_DIR = new Path("target/output"); + fs = FileSystem.get(conf); + if (fs.exists(OUTPUT_DIR)) { + fs.delete(OUTPUT_DIR, true); + } + FileOutputFormat.setOutputPath(job, OUTPUT_DIR); + boolean ret = job.waitForCompletion(true); + + assertEquals(true, ret); + checkOutput(); + + } + + @After + public void tearDown() throws IOException { + fs.delete(OUTPUT_DIR, true); + } + +}