HADOOP-8597. Permit FsShell's text command to read Avro files. Contributed by Ivan Vladimirov.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1383607 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
043a27a3ef
commit
de2efbe3ba
|
@ -218,6 +218,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
NEW FEATURES
|
||||
|
||||
HADOOP-8597. Permit FsShell's text command to read Avro files.
|
||||
(Ivan Vladimirov Ivanov via cutting)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-8789. Tests setLevel(Level.OFF) should be Level.ERROR.
|
||||
|
|
|
@ -17,11 +17,21 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs.shell;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.InputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import org.apache.avro.file.DataFileReader;
|
||||
import org.apache.avro.file.FileReader;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.apache.avro.io.EncoderFactory;
|
||||
import org.apache.avro.io.JsonEncoder;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -37,6 +47,10 @@ import org.apache.hadoop.io.Writable;
|
|||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.codehaus.jackson.JsonEncoding;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.util.MinimalPrettyPrinter;
|
||||
|
||||
/**
|
||||
* Display contents of files
|
||||
|
@ -95,14 +109,14 @@ class Display extends FsCommand {
|
|||
|
||||
/**
|
||||
* Same behavior as "-cat", but handles zip and TextRecordInputStream
|
||||
* encodings.
|
||||
* and Avro encodings.
|
||||
*/
|
||||
public static class Text extends Cat {
|
||||
public static final String NAME = "text";
|
||||
public static final String USAGE = Cat.USAGE;
|
||||
public static final String DESCRIPTION =
|
||||
"Takes a source file and outputs the file in text format.\n" +
|
||||
"The allowed formats are zip and TextRecordInputStream.";
|
||||
"The allowed formats are zip and TextRecordInputStream and Avro.";
|
||||
|
||||
@Override
|
||||
protected InputStream getInputStream(PathData item) throws IOException {
|
||||
|
@ -132,6 +146,13 @@ class Display extends FsCommand {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case 0x4f62: { // 'O' 'b'
|
||||
if (i.readByte() == 'j') {
|
||||
i.close();
|
||||
return new AvroFileInputStream(item.stat);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// File is non-compressed, or not a file container we know.
|
||||
|
@ -187,4 +208,68 @@ class Display extends FsCommand {
|
|||
super.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class transforms a binary Avro data file into an InputStream
|
||||
* with data that is in a human readable JSON format.
|
||||
*/
|
||||
protected static class AvroFileInputStream extends InputStream {
|
||||
private int pos;
|
||||
private byte[] buffer;
|
||||
private ByteArrayOutputStream output;
|
||||
private FileReader fileReader;
|
||||
private DatumWriter<Object> writer;
|
||||
private JsonEncoder encoder;
|
||||
|
||||
public AvroFileInputStream(FileStatus status) throws IOException {
|
||||
pos = 0;
|
||||
buffer = new byte[0];
|
||||
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
|
||||
fileReader =
|
||||
DataFileReader.openReader(new File(status.getPath().toUri()), reader);
|
||||
Schema schema = fileReader.getSchema();
|
||||
writer = new GenericDatumWriter<Object>(schema);
|
||||
output = new ByteArrayOutputStream();
|
||||
JsonGenerator generator =
|
||||
new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8);
|
||||
MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter();
|
||||
prettyPrinter.setRootValueSeparator(System.getProperty("line.separator"));
|
||||
generator.setPrettyPrinter(prettyPrinter);
|
||||
encoder = EncoderFactory.get().jsonEncoder(schema, generator);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a single byte from the stream.
|
||||
*/
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
if (pos < buffer.length) {
|
||||
return buffer[pos++];
|
||||
}
|
||||
if (!fileReader.hasNext()) {
|
||||
return -1;
|
||||
}
|
||||
writer.write(fileReader.next(), encoder);
|
||||
encoder.flush();
|
||||
if (!fileReader.hasNext()) {
|
||||
// Write a new line after the last Avro record.
|
||||
output.write(System.getProperty("line.separator").getBytes());
|
||||
output.flush();
|
||||
}
|
||||
pos = 0;
|
||||
buffer = output.toByteArray();
|
||||
output.reset();
|
||||
return read();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the stream.
|
||||
*/
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
fileReader.close();
|
||||
output.close();
|
||||
super.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.shell;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This class tests the logic for displaying the binary formats supported
|
||||
* by the Text command.
|
||||
*/
|
||||
public class TestTextCommand {
|
||||
private static final String TEST_ROOT_DIR =
|
||||
System.getProperty("test.build.data", "build/test/data/") + "/testText";
|
||||
private static final String AVRO_FILENAME = TEST_ROOT_DIR + "/weather.avro";
|
||||
|
||||
/**
|
||||
* Tests whether binary Avro data files are displayed correctly.
|
||||
*/
|
||||
@Test
|
||||
public void testDisplayForAvroFiles() throws Exception {
|
||||
// Create a small Avro data file on the local file system.
|
||||
createAvroFile(generateWeatherAvroBinaryData());
|
||||
|
||||
// Prepare and call the Text command's protected getInputStream method
|
||||
// using reflection.
|
||||
Configuration conf = new Configuration();
|
||||
File localPath = new File(AVRO_FILENAME);
|
||||
PathData pathData = new PathData(localPath, conf);
|
||||
Display.Text text = new Display.Text();
|
||||
text.setConf(conf);
|
||||
Method method = text.getClass().getDeclaredMethod(
|
||||
"getInputStream", PathData.class);
|
||||
method.setAccessible(true);
|
||||
InputStream stream = (InputStream) method.invoke(text, pathData);
|
||||
String output = inputStreamToString(stream);
|
||||
|
||||
// Check the output.
|
||||
String expectedOutput =
|
||||
"{\"station\":\"011990-99999\",\"time\":-619524000000,\"temp\":0}" +
|
||||
System.getProperty("line.separator") +
|
||||
"{\"station\":\"011990-99999\",\"time\":-619506000000,\"temp\":22}" +
|
||||
System.getProperty("line.separator") +
|
||||
"{\"station\":\"011990-99999\",\"time\":-619484400000,\"temp\":-11}" +
|
||||
System.getProperty("line.separator") +
|
||||
"{\"station\":\"012650-99999\",\"time\":-655531200000,\"temp\":111}" +
|
||||
System.getProperty("line.separator") +
|
||||
"{\"station\":\"012650-99999\",\"time\":-655509600000,\"temp\":78}" +
|
||||
System.getProperty("line.separator");
|
||||
|
||||
assertEquals(expectedOutput, output);
|
||||
}
|
||||
|
||||
private String inputStreamToString(InputStream stream) throws IOException {
|
||||
StringWriter writer = new StringWriter();
|
||||
IOUtils.copy(stream, writer);
|
||||
return writer.toString();
|
||||
}
|
||||
|
||||
private void createAvroFile(byte[] contents) throws IOException {
|
||||
(new File(TEST_ROOT_DIR)).mkdir();
|
||||
File file = new File(AVRO_FILENAME);
|
||||
file.createNewFile();
|
||||
FileOutputStream stream = new FileOutputStream(file);
|
||||
stream.write(contents);
|
||||
stream.close();
|
||||
}
|
||||
|
||||
private byte[] generateWeatherAvroBinaryData() {
|
||||
// The contents of a simple binary Avro file with weather records.
|
||||
byte[] contents = {
|
||||
(byte) 0x4f, (byte) 0x62, (byte) 0x6a, (byte) 0x1,
|
||||
(byte) 0x4, (byte) 0x14, (byte) 0x61, (byte) 0x76,
|
||||
(byte) 0x72, (byte) 0x6f, (byte) 0x2e, (byte) 0x63,
|
||||
(byte) 0x6f, (byte) 0x64, (byte) 0x65, (byte) 0x63,
|
||||
(byte) 0x8, (byte) 0x6e, (byte) 0x75, (byte) 0x6c,
|
||||
(byte) 0x6c, (byte) 0x16, (byte) 0x61, (byte) 0x76,
|
||||
(byte) 0x72, (byte) 0x6f, (byte) 0x2e, (byte) 0x73,
|
||||
(byte) 0x63, (byte) 0x68, (byte) 0x65, (byte) 0x6d,
|
||||
(byte) 0x61, (byte) 0xf2, (byte) 0x2, (byte) 0x7b,
|
||||
(byte) 0x22, (byte) 0x74, (byte) 0x79, (byte) 0x70,
|
||||
(byte) 0x65, (byte) 0x22, (byte) 0x3a, (byte) 0x22,
|
||||
(byte) 0x72, (byte) 0x65, (byte) 0x63, (byte) 0x6f,
|
||||
(byte) 0x72, (byte) 0x64, (byte) 0x22, (byte) 0x2c,
|
||||
(byte) 0x22, (byte) 0x6e, (byte) 0x61, (byte) 0x6d,
|
||||
(byte) 0x65, (byte) 0x22, (byte) 0x3a, (byte) 0x22,
|
||||
(byte) 0x57, (byte) 0x65, (byte) 0x61, (byte) 0x74,
|
||||
(byte) 0x68, (byte) 0x65, (byte) 0x72, (byte) 0x22,
|
||||
(byte) 0x2c, (byte) 0x22, (byte) 0x6e, (byte) 0x61,
|
||||
(byte) 0x6d, (byte) 0x65, (byte) 0x73, (byte) 0x70,
|
||||
(byte) 0x61, (byte) 0x63, (byte) 0x65, (byte) 0x22,
|
||||
(byte) 0x3a, (byte) 0x22, (byte) 0x74, (byte) 0x65,
|
||||
(byte) 0x73, (byte) 0x74, (byte) 0x22, (byte) 0x2c,
|
||||
(byte) 0x22, (byte) 0x66, (byte) 0x69, (byte) 0x65,
|
||||
(byte) 0x6c, (byte) 0x64, (byte) 0x73, (byte) 0x22,
|
||||
(byte) 0x3a, (byte) 0x5b, (byte) 0x7b, (byte) 0x22,
|
||||
(byte) 0x6e, (byte) 0x61, (byte) 0x6d, (byte) 0x65,
|
||||
(byte) 0x22, (byte) 0x3a, (byte) 0x22, (byte) 0x73,
|
||||
(byte) 0x74, (byte) 0x61, (byte) 0x74, (byte) 0x69,
|
||||
(byte) 0x6f, (byte) 0x6e, (byte) 0x22, (byte) 0x2c,
|
||||
(byte) 0x22, (byte) 0x74, (byte) 0x79, (byte) 0x70,
|
||||
(byte) 0x65, (byte) 0x22, (byte) 0x3a, (byte) 0x22,
|
||||
(byte) 0x73, (byte) 0x74, (byte) 0x72, (byte) 0x69,
|
||||
(byte) 0x6e, (byte) 0x67, (byte) 0x22, (byte) 0x7d,
|
||||
(byte) 0x2c, (byte) 0x7b, (byte) 0x22, (byte) 0x6e,
|
||||
(byte) 0x61, (byte) 0x6d, (byte) 0x65, (byte) 0x22,
|
||||
(byte) 0x3a, (byte) 0x22, (byte) 0x74, (byte) 0x69,
|
||||
(byte) 0x6d, (byte) 0x65, (byte) 0x22, (byte) 0x2c,
|
||||
(byte) 0x22, (byte) 0x74, (byte) 0x79, (byte) 0x70,
|
||||
(byte) 0x65, (byte) 0x22, (byte) 0x3a, (byte) 0x22,
|
||||
(byte) 0x6c, (byte) 0x6f, (byte) 0x6e, (byte) 0x67,
|
||||
(byte) 0x22, (byte) 0x7d, (byte) 0x2c, (byte) 0x7b,
|
||||
(byte) 0x22, (byte) 0x6e, (byte) 0x61, (byte) 0x6d,
|
||||
(byte) 0x65, (byte) 0x22, (byte) 0x3a, (byte) 0x22,
|
||||
(byte) 0x74, (byte) 0x65, (byte) 0x6d, (byte) 0x70,
|
||||
(byte) 0x22, (byte) 0x2c, (byte) 0x22, (byte) 0x74,
|
||||
(byte) 0x79, (byte) 0x70, (byte) 0x65, (byte) 0x22,
|
||||
(byte) 0x3a, (byte) 0x22, (byte) 0x69, (byte) 0x6e,
|
||||
(byte) 0x74, (byte) 0x22, (byte) 0x7d, (byte) 0x5d,
|
||||
(byte) 0x2c, (byte) 0x22, (byte) 0x64, (byte) 0x6f,
|
||||
(byte) 0x63, (byte) 0x22, (byte) 0x3a, (byte) 0x22,
|
||||
(byte) 0x41, (byte) 0x20, (byte) 0x77, (byte) 0x65,
|
||||
(byte) 0x61, (byte) 0x74, (byte) 0x68, (byte) 0x65,
|
||||
(byte) 0x72, (byte) 0x20, (byte) 0x72, (byte) 0x65,
|
||||
(byte) 0x61, (byte) 0x64, (byte) 0x69, (byte) 0x6e,
|
||||
(byte) 0x67, (byte) 0x2e, (byte) 0x22, (byte) 0x7d,
|
||||
(byte) 0x0, (byte) 0xb0, (byte) 0x81, (byte) 0xb3,
|
||||
(byte) 0xc4, (byte) 0xa, (byte) 0xc, (byte) 0xf6,
|
||||
(byte) 0x62, (byte) 0xfa, (byte) 0xc9, (byte) 0x38,
|
||||
(byte) 0xfd, (byte) 0x7e, (byte) 0x52, (byte) 0x0,
|
||||
(byte) 0xa7, (byte) 0xa, (byte) 0xcc, (byte) 0x1,
|
||||
(byte) 0x18, (byte) 0x30, (byte) 0x31, (byte) 0x31,
|
||||
(byte) 0x39, (byte) 0x39, (byte) 0x30, (byte) 0x2d,
|
||||
(byte) 0x39, (byte) 0x39, (byte) 0x39, (byte) 0x39,
|
||||
(byte) 0x39, (byte) 0xff, (byte) 0xa3, (byte) 0x90,
|
||||
(byte) 0xe8, (byte) 0x87, (byte) 0x24, (byte) 0x0,
|
||||
(byte) 0x18, (byte) 0x30, (byte) 0x31, (byte) 0x31,
|
||||
(byte) 0x39, (byte) 0x39, (byte) 0x30, (byte) 0x2d,
|
||||
(byte) 0x39, (byte) 0x39, (byte) 0x39, (byte) 0x39,
|
||||
(byte) 0x39, (byte) 0xff, (byte) 0x81, (byte) 0xfb,
|
||||
(byte) 0xd6, (byte) 0x87, (byte) 0x24, (byte) 0x2c,
|
||||
(byte) 0x18, (byte) 0x30, (byte) 0x31, (byte) 0x31,
|
||||
(byte) 0x39, (byte) 0x39, (byte) 0x30, (byte) 0x2d,
|
||||
(byte) 0x39, (byte) 0x39, (byte) 0x39, (byte) 0x39,
|
||||
(byte) 0x39, (byte) 0xff, (byte) 0xa5, (byte) 0xae,
|
||||
(byte) 0xc2, (byte) 0x87, (byte) 0x24, (byte) 0x15,
|
||||
(byte) 0x18, (byte) 0x30, (byte) 0x31, (byte) 0x32,
|
||||
(byte) 0x36, (byte) 0x35, (byte) 0x30, (byte) 0x2d,
|
||||
(byte) 0x39, (byte) 0x39, (byte) 0x39, (byte) 0x39,
|
||||
(byte) 0x39, (byte) 0xff, (byte) 0xb7, (byte) 0xa2,
|
||||
(byte) 0x8b, (byte) 0x94, (byte) 0x26, (byte) 0xde,
|
||||
(byte) 0x1, (byte) 0x18, (byte) 0x30, (byte) 0x31,
|
||||
(byte) 0x32, (byte) 0x36, (byte) 0x35, (byte) 0x30,
|
||||
(byte) 0x2d, (byte) 0x39, (byte) 0x39, (byte) 0x39,
|
||||
(byte) 0x39, (byte) 0x39, (byte) 0xff, (byte) 0xdb,
|
||||
(byte) 0xd5, (byte) 0xf6, (byte) 0x93, (byte) 0x26,
|
||||
(byte) 0x9c, (byte) 0x1, (byte) 0xb0, (byte) 0x81,
|
||||
(byte) 0xb3, (byte) 0xc4, (byte) 0xa, (byte) 0xc,
|
||||
(byte) 0xf6, (byte) 0x62, (byte) 0xfa, (byte) 0xc9,
|
||||
(byte) 0x38, (byte) 0xfd, (byte) 0x7e, (byte) 0x52,
|
||||
(byte) 0x0, (byte) 0xa7,
|
||||
};
|
||||
|
||||
return contents;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue