MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write out text files without separators (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1533624 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f3043374ff
commit
7d637a3a99
|
@ -209,6 +209,9 @@ Release 2.2.1 - UNRELEASED
|
|||
MAPREDUCE-5463. Deprecate SLOTS_MILLIS counters (Tzuyoshi Ozawa via Sandy
|
||||
Ryza)
|
||||
|
||||
MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write
|
||||
out text files without separators (Sandy Ryza)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.streaming.io;
|
||||
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.typedbytes.TypedBytesWritable;
|
||||
|
||||
|
@ -34,6 +35,7 @@ public class IdentifierResolver {
|
|||
public static final String TEXT_ID = "text";
|
||||
public static final String RAW_BYTES_ID = "rawbytes";
|
||||
public static final String TYPED_BYTES_ID = "typedbytes";
|
||||
public static final String KEY_ONLY_TEXT_ID = "keyonlytext";
|
||||
|
||||
private Class<? extends InputWriter> inputWriterClass = null;
|
||||
private Class<? extends OutputReader> outputReaderClass = null;
|
||||
|
@ -55,6 +57,11 @@ public class IdentifierResolver {
|
|||
setOutputReaderClass(TypedBytesOutputReader.class);
|
||||
setOutputKeyClass(TypedBytesWritable.class);
|
||||
setOutputValueClass(TypedBytesWritable.class);
|
||||
} else if (identifier.equalsIgnoreCase(KEY_ONLY_TEXT_ID)) {
|
||||
setInputWriterClass(KeyOnlyTextInputWriter.class);
|
||||
setOutputReaderClass(KeyOnlyTextOutputReader.class);
|
||||
setOutputKeyClass(Text.class);
|
||||
setOutputValueClass(NullWritable.class);
|
||||
} else { // assume TEXT_ID
|
||||
setInputWriterClass(TextInputWriter.class);
|
||||
setOutputReaderClass(TextOutputReader.class);
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.streaming.io;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
||||
public class KeyOnlyTextInputWriter extends TextInputWriter {
|
||||
|
||||
@Override
|
||||
public void writeKey(Object key) throws IOException {
|
||||
writeUTF8(key);
|
||||
clientOut.write('\n');
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeValue(Object value) throws IOException {}
|
||||
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.streaming.io;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.streaming.PipeMapRed;
|
||||
import org.apache.hadoop.util.LineReader;
|
||||
|
||||
/**
|
||||
* OutputReader that reads the client's output as text, interpreting each line
|
||||
* as a key and outputting NullWritables for values.
|
||||
*/
|
||||
public class KeyOnlyTextOutputReader extends OutputReader<Text, NullWritable> {
|
||||
|
||||
private LineReader lineReader;
|
||||
private byte[] bytes;
|
||||
private DataInput clientIn;
|
||||
private Configuration conf;
|
||||
private Text key;
|
||||
private Text line;
|
||||
|
||||
@Override
|
||||
public void initialize(PipeMapRed pipeMapRed) throws IOException {
|
||||
super.initialize(pipeMapRed);
|
||||
clientIn = pipeMapRed.getClientInput();
|
||||
conf = pipeMapRed.getConfiguration();
|
||||
lineReader = new LineReader((InputStream)clientIn, conf);
|
||||
key = new Text();
|
||||
line = new Text();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readKeyValue() throws IOException {
|
||||
if (lineReader.readLine(line) <= 0) {
|
||||
return false;
|
||||
}
|
||||
bytes = line.getBytes();
|
||||
key.set(bytes, 0, line.getLength());
|
||||
|
||||
line.clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Text getCurrentKey() throws IOException {
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NullWritable getCurrentValue() throws IOException {
|
||||
return NullWritable.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLastOutput() {
|
||||
if (bytes != null) {
|
||||
try {
|
||||
return new String(bytes, "UTF-8");
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
return "<undecodable>";
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -30,7 +30,7 @@ import org.apache.hadoop.streaming.PipeMapRed;
|
|||
*/
|
||||
public class TextInputWriter extends InputWriter<Object, Object> {
|
||||
|
||||
private DataOutput clientOut;
|
||||
protected DataOutput clientOut;
|
||||
private byte[] inputSeparator;
|
||||
|
||||
@Override
|
||||
|
@ -53,7 +53,7 @@ public class TextInputWriter extends InputWriter<Object, Object> {
|
|||
}
|
||||
|
||||
// Write an object to the output stream using UTF-8 encoding
|
||||
private void writeUTF8(Object object) throws IOException {
|
||||
protected void writeUTF8(Object object) throws IOException {
|
||||
byte[] bval;
|
||||
int valSize;
|
||||
if (object instanceof BytesWritable) {
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.streaming;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestStreamingOutputOnlyKeys extends TestStreaming {
|
||||
|
||||
public TestStreamingOutputOnlyKeys() throws IOException {
|
||||
super();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOutputOnlyKeys() throws Exception {
|
||||
args.add("-jobconf"); args.add("stream.reduce.input" +
|
||||
"=keyonlytext");
|
||||
args.add("-jobconf"); args.add("stream.reduce.output" +
|
||||
"=keyonlytext");
|
||||
super.testCommandLine();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExpectedOutput() {
|
||||
return outputExpect.replaceAll("\t", "");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testCommandLine() {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.streaming.io;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.streaming.PipeMapRed;
|
||||
import org.apache.hadoop.streaming.PipeMapper;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestKeyOnlyTextOutputReader {
|
||||
@Test
|
||||
public void testKeyOnlyTextOutputReader() throws IOException {
|
||||
String text = "key,value\nkey2,value2\nnocomma\n";
|
||||
PipeMapRed pipeMapRed = new MyPipeMapRed(text);
|
||||
KeyOnlyTextOutputReader outputReader = new KeyOnlyTextOutputReader();
|
||||
outputReader.initialize(pipeMapRed);
|
||||
outputReader.readKeyValue();
|
||||
Assert.assertEquals(new Text("key,value"), outputReader.getCurrentKey());
|
||||
outputReader.readKeyValue();
|
||||
Assert.assertEquals(new Text("key2,value2"), outputReader.getCurrentKey());
|
||||
outputReader.readKeyValue();
|
||||
Assert.assertEquals(new Text("nocomma"), outputReader.getCurrentKey());
|
||||
Assert.assertEquals(false, outputReader.readKeyValue());
|
||||
}
|
||||
|
||||
private class MyPipeMapRed extends PipeMapper {
|
||||
private DataInput clientIn;
|
||||
private Configuration conf = new Configuration();
|
||||
|
||||
public MyPipeMapRed(String text) {
|
||||
clientIn = new DataInputStream(new ByteArrayInputStream(text.getBytes()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataInput getClientInput() {
|
||||
return clientIn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue