diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 5114027baea..e961f6c9dbf 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -6,6 +6,9 @@ Release 2.0.3-alpha - Unreleased NEW FEATURES + HADOOP-8597. Permit FsShell's text command to read Avro files. + (Ivan Vladimirov Ivanov via cutting) + IMPROVEMENTS HADOOP-8789. Tests setLevel(Level.OFF) should be Level.ERROR. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java index d80b2d6686a..036d8c99177 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java @@ -17,11 +17,21 @@ */ package org.apache.hadoop.fs.shell; -import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.InputStream; +import java.io.IOException; import java.util.LinkedList; import java.util.zip.GZIPInputStream; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.io.JsonEncoder; +import org.apache.avro.Schema; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -38,6 +48,10 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.util.ReflectionUtils; +import org.codehaus.jackson.JsonEncoding; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.util.MinimalPrettyPrinter; /** * Display contents of files @@ -96,14 +110,14 @@ class Display extends FsCommand { /** * Same behavior as "-cat", but handles zip and TextRecordInputStream - * encodings. + * and Avro encodings. */ public static class Text extends Cat { public static final String NAME = "text"; public static final String USAGE = Cat.USAGE; public static final String DESCRIPTION = "Takes a source file and outputs the file in text format.\n" + - "The allowed formats are zip and TextRecordInputStream."; + "The allowed formats are zip and TextRecordInputStream and Avro."; @Override protected InputStream getInputStream(PathData item) throws IOException { @@ -133,6 +147,13 @@ class Display extends FsCommand { } break; } + case 0x4f62: { // 'O' 'b' + if (i.readByte() == 'j') { + i.close(); + return new AvroFileInputStream(item.stat); + } + break; + } } // File is non-compressed, or not a file container we know. @@ -188,4 +209,68 @@ class Display extends FsCommand { super.close(); } } + + /** + * This class transforms a binary Avro data file into an InputStream + * with data that is in a human readable JSON format. + */ + protected static class AvroFileInputStream extends InputStream { + private int pos; + private byte[] buffer; + private ByteArrayOutputStream output; + private FileReader fileReader; + private DatumWriter writer; + private JsonEncoder encoder; + + public AvroFileInputStream(FileStatus status) throws IOException { + pos = 0; + buffer = new byte[0]; + GenericDatumReader reader = new GenericDatumReader(); + fileReader = + DataFileReader.openReader(new File(status.getPath().toUri()), reader); + Schema schema = fileReader.getSchema(); + writer = new GenericDatumWriter(schema); + output = new ByteArrayOutputStream(); + JsonGenerator generator = + new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8); + MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter(); + prettyPrinter.setRootValueSeparator(System.getProperty("line.separator")); + generator.setPrettyPrinter(prettyPrinter); + encoder = EncoderFactory.get().jsonEncoder(schema, generator); + } + + /** + * Read a single byte from the stream. + */ + @Override + public int read() throws IOException { + if (pos < buffer.length) { + return buffer[pos++]; + } + if (!fileReader.hasNext()) { + return -1; + } + writer.write(fileReader.next(), encoder); + encoder.flush(); + if (!fileReader.hasNext()) { + // Write a new line after the last Avro record. + output.write(System.getProperty("line.separator").getBytes()); + output.flush(); + } + pos = 0; + buffer = output.toByteArray(); + output.reset(); + return read(); + } + + /** + * Close the stream. + */ + @Override + public void close() throws IOException { + fileReader.close(); + output.close(); + super.close(); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestTextCommand.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestTextCommand.java new file mode 100644 index 00000000000..99c1ae7b046 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestTextCommand.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.shell; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.IOException; +import java.io.StringWriter; +import java.lang.reflect.Method; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +/** + * This class tests the logic for displaying the binary formats supported + * by the Text command. + */ +public class TestTextCommand { + private static final String TEST_ROOT_DIR = + System.getProperty("test.build.data", "build/test/data/") + "/testText"; + private static final String AVRO_FILENAME = TEST_ROOT_DIR + "/weather.avro"; + + /** + * Tests whether binary Avro data files are displayed correctly. + */ + @Test + public void testDisplayForAvroFiles() throws Exception { + // Create a small Avro data file on the local file system. + createAvroFile(generateWeatherAvroBinaryData()); + + // Prepare and call the Text command's protected getInputStream method + // using reflection. + Configuration conf = new Configuration(); + File localPath = new File(AVRO_FILENAME); + PathData pathData = new PathData(localPath, conf); + Display.Text text = new Display.Text(); + text.setConf(conf); + Method method = text.getClass().getDeclaredMethod( + "getInputStream", PathData.class); + method.setAccessible(true); + InputStream stream = (InputStream) method.invoke(text, pathData); + String output = inputStreamToString(stream); + + // Check the output. + String expectedOutput = + "{\"station\":\"011990-99999\",\"time\":-619524000000,\"temp\":0}" + + System.getProperty("line.separator") + + "{\"station\":\"011990-99999\",\"time\":-619506000000,\"temp\":22}" + + System.getProperty("line.separator") + + "{\"station\":\"011990-99999\",\"time\":-619484400000,\"temp\":-11}" + + System.getProperty("line.separator") + + "{\"station\":\"012650-99999\",\"time\":-655531200000,\"temp\":111}" + + System.getProperty("line.separator") + + "{\"station\":\"012650-99999\",\"time\":-655509600000,\"temp\":78}" + + System.getProperty("line.separator"); + + assertEquals(expectedOutput, output); + } + + private String inputStreamToString(InputStream stream) throws IOException { + StringWriter writer = new StringWriter(); + IOUtils.copy(stream, writer); + return writer.toString(); + } + + private void createAvroFile(byte[] contents) throws IOException { + (new File(TEST_ROOT_DIR)).mkdir(); + File file = new File(AVRO_FILENAME); + file.createNewFile(); + FileOutputStream stream = new FileOutputStream(file); + stream.write(contents); + stream.close(); + } + + private byte[] generateWeatherAvroBinaryData() { + // The contents of a simple binary Avro file with weather records. + byte[] contents = { + (byte) 0x4f, (byte) 0x62, (byte) 0x6a, (byte) 0x1, + (byte) 0x4, (byte) 0x14, (byte) 0x61, (byte) 0x76, + (byte) 0x72, (byte) 0x6f, (byte) 0x2e, (byte) 0x63, + (byte) 0x6f, (byte) 0x64, (byte) 0x65, (byte) 0x63, + (byte) 0x8, (byte) 0x6e, (byte) 0x75, (byte) 0x6c, + (byte) 0x6c, (byte) 0x16, (byte) 0x61, (byte) 0x76, + (byte) 0x72, (byte) 0x6f, (byte) 0x2e, (byte) 0x73, + (byte) 0x63, (byte) 0x68, (byte) 0x65, (byte) 0x6d, + (byte) 0x61, (byte) 0xf2, (byte) 0x2, (byte) 0x7b, + (byte) 0x22, (byte) 0x74, (byte) 0x79, (byte) 0x70, + (byte) 0x65, (byte) 0x22, (byte) 0x3a, (byte) 0x22, + (byte) 0x72, (byte) 0x65, (byte) 0x63, (byte) 0x6f, + (byte) 0x72, (byte) 0x64, (byte) 0x22, (byte) 0x2c, + (byte) 0x22, (byte) 0x6e, (byte) 0x61, (byte) 0x6d, + (byte) 0x65, (byte) 0x22, (byte) 0x3a, (byte) 0x22, + (byte) 0x57, (byte) 0x65, (byte) 0x61, (byte) 0x74, + (byte) 0x68, (byte) 0x65, (byte) 0x72, (byte) 0x22, + (byte) 0x2c, (byte) 0x22, (byte) 0x6e, (byte) 0x61, + (byte) 0x6d, (byte) 0x65, (byte) 0x73, (byte) 0x70, + (byte) 0x61, (byte) 0x63, (byte) 0x65, (byte) 0x22, + (byte) 0x3a, (byte) 0x22, (byte) 0x74, (byte) 0x65, + (byte) 0x73, (byte) 0x74, (byte) 0x22, (byte) 0x2c, + (byte) 0x22, (byte) 0x66, (byte) 0x69, (byte) 0x65, + (byte) 0x6c, (byte) 0x64, (byte) 0x73, (byte) 0x22, + (byte) 0x3a, (byte) 0x5b, (byte) 0x7b, (byte) 0x22, + (byte) 0x6e, (byte) 0x61, (byte) 0x6d, (byte) 0x65, + (byte) 0x22, (byte) 0x3a, (byte) 0x22, (byte) 0x73, + (byte) 0x74, (byte) 0x61, (byte) 0x74, (byte) 0x69, + (byte) 0x6f, (byte) 0x6e, (byte) 0x22, (byte) 0x2c, + (byte) 0x22, (byte) 0x74, (byte) 0x79, (byte) 0x70, + (byte) 0x65, (byte) 0x22, (byte) 0x3a, (byte) 0x22, + (byte) 0x73, (byte) 0x74, (byte) 0x72, (byte) 0x69, + (byte) 0x6e, (byte) 0x67, (byte) 0x22, (byte) 0x7d, + (byte) 0x2c, (byte) 0x7b, (byte) 0x22, (byte) 0x6e, + (byte) 0x61, (byte) 0x6d, (byte) 0x65, (byte) 0x22, + (byte) 0x3a, (byte) 0x22, (byte) 0x74, (byte) 0x69, + (byte) 0x6d, (byte) 0x65, (byte) 0x22, (byte) 0x2c, + (byte) 0x22, (byte) 0x74, (byte) 0x79, (byte) 0x70, + (byte) 0x65, (byte) 0x22, (byte) 0x3a, (byte) 0x22, + (byte) 0x6c, (byte) 0x6f, (byte) 0x6e, (byte) 0x67, + (byte) 0x22, (byte) 0x7d, (byte) 0x2c, (byte) 0x7b, + (byte) 0x22, (byte) 0x6e, (byte) 0x61, (byte) 0x6d, + (byte) 0x65, (byte) 0x22, (byte) 0x3a, (byte) 0x22, + (byte) 0x74, (byte) 0x65, (byte) 0x6d, (byte) 0x70, + (byte) 0x22, (byte) 0x2c, (byte) 0x22, (byte) 0x74, + (byte) 0x79, (byte) 0x70, (byte) 0x65, (byte) 0x22, + (byte) 0x3a, (byte) 0x22, (byte) 0x69, (byte) 0x6e, + (byte) 0x74, (byte) 0x22, (byte) 0x7d, (byte) 0x5d, + (byte) 0x2c, (byte) 0x22, (byte) 0x64, (byte) 0x6f, + (byte) 0x63, (byte) 0x22, (byte) 0x3a, (byte) 0x22, + (byte) 0x41, (byte) 0x20, (byte) 0x77, (byte) 0x65, + (byte) 0x61, (byte) 0x74, (byte) 0x68, (byte) 0x65, + (byte) 0x72, (byte) 0x20, (byte) 0x72, (byte) 0x65, + (byte) 0x61, (byte) 0x64, (byte) 0x69, (byte) 0x6e, + (byte) 0x67, (byte) 0x2e, (byte) 0x22, (byte) 0x7d, + (byte) 0x0, (byte) 0xb0, (byte) 0x81, (byte) 0xb3, + (byte) 0xc4, (byte) 0xa, (byte) 0xc, (byte) 0xf6, + (byte) 0x62, (byte) 0xfa, (byte) 0xc9, (byte) 0x38, + (byte) 0xfd, (byte) 0x7e, (byte) 0x52, (byte) 0x0, + (byte) 0xa7, (byte) 0xa, (byte) 0xcc, (byte) 0x1, + (byte) 0x18, (byte) 0x30, (byte) 0x31, (byte) 0x31, + (byte) 0x39, (byte) 0x39, (byte) 0x30, (byte) 0x2d, + (byte) 0x39, (byte) 0x39, (byte) 0x39, (byte) 0x39, + (byte) 0x39, (byte) 0xff, (byte) 0xa3, (byte) 0x90, + (byte) 0xe8, (byte) 0x87, (byte) 0x24, (byte) 0x0, + (byte) 0x18, (byte) 0x30, (byte) 0x31, (byte) 0x31, + (byte) 0x39, (byte) 0x39, (byte) 0x30, (byte) 0x2d, + (byte) 0x39, (byte) 0x39, (byte) 0x39, (byte) 0x39, + (byte) 0x39, (byte) 0xff, (byte) 0x81, (byte) 0xfb, + (byte) 0xd6, (byte) 0x87, (byte) 0x24, (byte) 0x2c, + (byte) 0x18, (byte) 0x30, (byte) 0x31, (byte) 0x31, + (byte) 0x39, (byte) 0x39, (byte) 0x30, (byte) 0x2d, + (byte) 0x39, (byte) 0x39, (byte) 0x39, (byte) 0x39, + (byte) 0x39, (byte) 0xff, (byte) 0xa5, (byte) 0xae, + (byte) 0xc2, (byte) 0x87, (byte) 0x24, (byte) 0x15, + (byte) 0x18, (byte) 0x30, (byte) 0x31, (byte) 0x32, + (byte) 0x36, (byte) 0x35, (byte) 0x30, (byte) 0x2d, + (byte) 0x39, (byte) 0x39, (byte) 0x39, (byte) 0x39, + (byte) 0x39, (byte) 0xff, (byte) 0xb7, (byte) 0xa2, + (byte) 0x8b, (byte) 0x94, (byte) 0x26, (byte) 0xde, + (byte) 0x1, (byte) 0x18, (byte) 0x30, (byte) 0x31, + (byte) 0x32, (byte) 0x36, (byte) 0x35, (byte) 0x30, + (byte) 0x2d, (byte) 0x39, (byte) 0x39, (byte) 0x39, + (byte) 0x39, (byte) 0x39, (byte) 0xff, (byte) 0xdb, + (byte) 0xd5, (byte) 0xf6, (byte) 0x93, (byte) 0x26, + (byte) 0x9c, (byte) 0x1, (byte) 0xb0, (byte) 0x81, + (byte) 0xb3, (byte) 0xc4, (byte) 0xa, (byte) 0xc, + (byte) 0xf6, (byte) 0x62, (byte) 0xfa, (byte) 0xc9, + (byte) 0x38, (byte) 0xfd, (byte) 0x7e, (byte) 0x52, + (byte) 0x0, (byte) 0xa7, + }; + + return contents; + } +} +