diff --git a/CHANGES.txt b/CHANGES.txt index e841f83a1d6..9a3c28d54b0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -152,6 +152,9 @@ Trunk (unreleased changes) HADOOP-5976. Add a new command, classpath, to the hadoop script. (Owen O'Malley and Gary Murry via szetszwo) + HADOOP-6120. Add support for Avro specific and reflect data. + (sharad via cutting) + IMPROVEMENTS HADOOP-4565. Added CombineFileInputFormat to use data locality information diff --git a/build.xml b/build.xml index 9aec9f62f27..2029c84be71 100644 --- a/build.xml +++ b/build.xml @@ -416,11 +416,22 @@ includes="**/*.jr" /> - + + + + + + + + + + + + - + + + + diff --git a/ivy/ivysettings.xml b/ivy/ivysettings.xml index a7fcd220317..d5765f35060 100644 --- a/ivy/ivysettings.xml +++ b/ivy/ivysettings.xml @@ -74,7 +74,7 @@ rather than look for them online. --> - + diff --git a/src/java/core-default.xml b/src/java/core-default.xml index 8fc055069f5..401cccb27d6 100644 --- a/src/java/core-default.xml +++ b/src/java/core-default.xml @@ -85,7 +85,7 @@ io.serializations - org.apache.hadoop.io.serializer.WritableSerialization + org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization A list of serialization classes that can be used for obtaining serializers and deserializers. diff --git a/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java b/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java index f4ba54b4e49..4051e097dcc 100644 --- a/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java +++ b/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java @@ -25,6 +25,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization; +import org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -50,7 +52,9 @@ public class SerializationFactory extends Configured { public SerializationFactory(Configuration conf) { super(conf); for (String serializerName : conf.getStrings("io.serializations", - new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"})) { + new String[]{WritableSerialization.class.getName(), + AvroSpecificSerialization.class.getName(), + AvroReflectSerialization.class.getName()})) { add(conf, serializerName); } } diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerializable.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerializable.java new file mode 100644 index 00000000000..1ea274e700d --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerializable.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer.avro; + +/** + * Tag interface for Avro 'reflect' serializable classes. Classes implementing + * this interface can be serialized/deserialized using + * {@link AvroReflectSerialization}. + */ +public interface AvroReflectSerializable { + +} diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java new file mode 100644 index 00000000000..5805bed4f3c --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer.avro; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; + +/** + * Serialization for Avro Reflect classes. For a class to be accepted by this + * serialization, it must either be in the package list configured via + * {@link AvroReflectSerialization#AVRO_REFLECT_PACKAGES} or implement + * {@link AvroReflectSerializable} interface. + * + */ +@SuppressWarnings("unchecked") +public class AvroReflectSerialization extends AvroSerialization{ + + /** + * Key to configure packages that contain classes to be serialized and + * deserialized using this class. Multiple packages can be specified using + * comma-separated list. + */ + public static final String AVRO_REFLECT_PACKAGES = "avro.reflect.pkgs"; + + private Set packages; + + public synchronized boolean accept(Class c) { + if (packages == null) { + getPackages(); + } + return AvroReflectSerializable.class.isAssignableFrom(c) || + packages.contains(c.getPackage().getName()); + } + + private void getPackages() { + String[] pkgList = getConf().getStrings(AVRO_REFLECT_PACKAGES); + packages = new HashSet(); + if (pkgList != null) { + for (String pkg : pkgList) { + packages.add(pkg.trim()); + } + } + } + + protected DatumReader getReader(Class clazz) { + try { + String prefix = + ((clazz.getEnclosingClass() == null + || "null".equals(clazz.getEnclosingClass().getName())) ? + clazz.getPackage().getName() + "." + : (clazz.getEnclosingClass().getName() + "$")); + return new ReflectDatumReader(ReflectData.getSchema(clazz), prefix); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected Schema getSchema(Object t) { + return ReflectData.getSchema(t.getClass()); + } + + protected DatumWriter getWriter(Class clazz) { + return new ReflectDatumWriter(); + } + +} diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java new file mode 100644 index 00000000000..54667c98685 --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer.avro; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; + +/** + * Base class for providing serialization to Avro types. + */ +public abstract class AvroSerialization extends Configured + implements Serialization{ + + public Deserializer getDeserializer(Class c) { + return new AvroDeserializer(c); + } + + public Serializer getSerializer(Class c) { + return new AvroSerializer(c); + } + + /** + * Return an Avro Schema instance for the given class. + */ + protected abstract Schema getSchema(T t); + + /** + * Create and return Avro DatumWriter for the given class. + */ + protected abstract DatumWriter getWriter(Class clazz); + + /** + * Create and return Avro DatumReader for the given class. + */ + protected abstract DatumReader getReader(Class clazz); + + class AvroSerializer implements Serializer { + + private DatumWriter writer; + private BinaryEncoder encoder; + private OutputStream outStream; + protected Class clazz; + + AvroSerializer(Class clazz) { + writer = getWriter(clazz); + } + + public void close() throws IOException { + encoder.flush(); + outStream.close(); + } + + public void open(OutputStream out) throws IOException { + outStream = out; + encoder = new BinaryEncoder(out); + } + + public void serialize(T t) throws IOException { + writer.setSchema(getSchema(t)); + writer.write(t, encoder); + } + + } + + class AvroDeserializer implements Deserializer { + + private DatumReader reader; + private BinaryDecoder decoder; + private InputStream inStream; + + AvroDeserializer(Class clazz) { + this.reader = getReader(clazz); + } + + public void close() throws IOException { + inStream.close(); + } + + public T deserialize(T t) throws IOException { + return reader.read(t, decoder); + } + + public void open(InputStream in) throws IOException { + inStream = in; + decoder = new BinaryDecoder(in); + } + + } + +} diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java new file mode 100644 index 00000000000..a44e402906d --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer.avro; + +import org.apache.avro.Schema; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; + +/** + * Serialization for Avro Specific classes. This serialization is to be used + * for classes generated by Avro's 'specific' compiler. + */ +@SuppressWarnings("unchecked") +public class AvroSpecificSerialization + extends AvroSerialization{ + + public boolean accept(Class c) { + return SpecificRecord.class.isAssignableFrom(c); + } + + protected DatumReader getReader(Class clazz) { + try { + return new SpecificDatumReader(clazz.newInstance().schema()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected Schema getSchema(SpecificRecord t) { + return t.schema(); + } + + protected DatumWriter getWriter(Class clazz) { + return new SpecificDatumWriter(); + } + +} diff --git a/src/java/org/apache/hadoop/io/serializer/avro/package.html b/src/java/org/apache/hadoop/io/serializer/avro/package.html new file mode 100644 index 00000000000..ce565f6206b --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/avro/package.html @@ -0,0 +1,43 @@ + + + + + + +

+This package provides Avro serialization in Hadoop. This can be used to +serialize/deserialize Avro types in Hadoop. +

+ +

+Use {@link org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization} for +serialization of classes generated by Avro's 'specific' compiler. +

+ +

+Use {@link org.apache.hadoop.io.serializer.avro.AvroReflectSerialization} for +other classes. +{@link org.apache.hadoop.io.serializer.avro.AvroReflectSerialization} work for +any class which is either in the package list configured via +{@link org.apache.hadoop.io.serializer.avro.AvroReflectSerialization#AVRO_REFLECT_PACKAGES} +or implement {@link org.apache.hadoop.io.serializer.avro.AvroReflectSerializable} +interface. +

+ + + diff --git a/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java b/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java new file mode 100644 index 00000000000..8d47db26cd2 --- /dev/null +++ b/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.serializer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.util.GenericsUtil; + +public class SerializationTestUtil { + + /** + * A utility that tests serialization/deserialization. + * @param the class of the item + * @param conf configuration to use, "io.serializations" is read to + * determine the serialization + * @param before item to (de)serialize + * @return deserialized item + */ + public static K testSerialization(Configuration conf, K before) + throws Exception { + + SerializationFactory factory = new SerializationFactory(conf); + Serializer serializer + = factory.getSerializer(GenericsUtil.getClass(before)); + Deserializer deserializer + = factory.getDeserializer(GenericsUtil.getClass(before)); + + DataOutputBuffer out = new DataOutputBuffer(); + serializer.open(out); + serializer.serialize(before); + serializer.close(); + + DataInputBuffer in = new DataInputBuffer(); + in.reset(out.getData(), out.getLength()); + deserializer.open(in); + K after = deserializer.deserialize(null); + deserializer.close(); + return after; + } + +} diff --git a/src/test/core/org/apache/hadoop/io/serializer/TestWritableSerialization.java b/src/test/core/org/apache/hadoop/io/serializer/TestWritableSerialization.java index 6a551753245..0d7c50b42bf 100644 --- a/src/test/core/org/apache/hadoop/io/serializer/TestWritableSerialization.java +++ b/src/test/core/org/apache/hadoop/io/serializer/TestWritableSerialization.java @@ -23,25 +23,18 @@ import static org.apache.hadoop.io.TestGenericWritable.CONF_TEST_VALUE; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.TestGenericWritable.Baz; import org.apache.hadoop.io.TestGenericWritable.FooGenericWritable; -import org.apache.hadoop.util.GenericsUtil; public class TestWritableSerialization extends TestCase { private static final Configuration conf = new Configuration(); - - static { - conf.set("io.serializations" - , "org.apache.hadoop.io.serializer.WritableSerialization"); - } - + public void testWritableSerialization() throws Exception { Text before = new Text("test writable"); - testSerialization(conf, before); + Text after = SerializationTestUtil.testSerialization(conf, before); + assertEquals(before, after); } @@ -56,40 +49,8 @@ public class TestWritableSerialization extends TestCase { generic.setConf(conf); Baz baz = new Baz(); generic.set(baz); - Baz result = testSerialization(conf, baz); + Baz result = SerializationTestUtil.testSerialization(conf, baz); + assertEquals(baz, result); assertNotNull(result.getConf()); } - - /** - * A utility that tests serialization/deserialization. - * @param the class of the item - * @param conf configuration to use, "io.serializations" is read to - * determine the serialization - * @param before item to (de)serialize - * @return deserialized item - */ - public static K testSerialization(Configuration conf, K before) - throws Exception { - - SerializationFactory factory = new SerializationFactory(conf); - Serializer serializer - = factory.getSerializer(GenericsUtil.getClass(before)); - Deserializer deserializer - = factory.getDeserializer(GenericsUtil.getClass(before)); - - DataOutputBuffer out = new DataOutputBuffer(); - serializer.open(out); - serializer.serialize(before); - serializer.close(); - - DataInputBuffer in = new DataInputBuffer(); - in.reset(out.getData(), out.getLength()); - deserializer.open(in); - K after = deserializer.deserialize(null); - deserializer.close(); - - assertEquals(before, after); - return after; - } - } diff --git a/src/test/core/org/apache/hadoop/io/serializer/avro/Record.java b/src/test/core/org/apache/hadoop/io/serializer/avro/Record.java new file mode 100644 index 00000000000..275a0dc1e28 --- /dev/null +++ b/src/test/core/org/apache/hadoop/io/serializer/avro/Record.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer.avro; + +public class Record { + public int x = 7; + + public int hashCode() { + return x; + } + + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final Record other = (Record) obj; + if (x != other.x) + return false; + return true; + } +} \ No newline at end of file diff --git a/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java b/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java new file mode 100644 index 00000000000..e65a299fa3c --- /dev/null +++ b/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer.avro; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.serializer.SerializationTestUtil; + +public class TestAvroSerialization extends TestCase { + + private static final Configuration conf = new Configuration(); + + public void testSpecific() throws Exception { + AvroRecord before = new AvroRecord(); + before.intField = 5; + AvroRecord after = SerializationTestUtil.testSerialization(conf, before); + assertEquals(before, after); + } + + public void testReflectPkg() throws Exception { + Record before = new Record(); + before.x = 10; + conf.set(AvroReflectSerialization.AVRO_REFLECT_PACKAGES, + before.getClass().getPackage().getName()); + Record after = SerializationTestUtil.testSerialization(conf, before); + assertEquals(before, after); + } + + public void testReflectInnerClass() throws Exception { + InnerRecord before = new InnerRecord(); + before.x = 10; + conf.set(AvroReflectSerialization.AVRO_REFLECT_PACKAGES, + before.getClass().getPackage().getName()); + InnerRecord after = SerializationTestUtil.testSerialization(conf, before); + assertEquals(before, after); + } + + public void testReflect() throws Exception { + RefSerializable before = new RefSerializable(); + before.x = 10; + RefSerializable after = + SerializationTestUtil.testSerialization(conf, before); + assertEquals(before, after); + } + + public static class InnerRecord { + public int x = 7; + + public int hashCode() { + return x; + } + + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final InnerRecord other = (InnerRecord) obj; + if (x != other.x) + return false; + return true; + } + } + + public static class RefSerializable implements AvroReflectSerializable { + public int x = 7; + + public int hashCode() { + return x; + } + + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + final RefSerializable other = (RefSerializable) obj; + if (x != other.x) + return false; + return true; + } + } +} diff --git a/src/test/core/org/apache/hadoop/io/serializer/avro/avroRecord.avsc b/src/test/core/org/apache/hadoop/io/serializer/avro/avroRecord.avsc new file mode 100644 index 00000000000..7e6ff617d7a --- /dev/null +++ b/src/test/core/org/apache/hadoop/io/serializer/avro/avroRecord.avsc @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +{"type": "record", "name":"AvroRecord", + "namespace": "org.apache.hadoop.io.serializer.avro", + "fields": [ + {"name": "intField", "type": "int"} + ] +}