diff --git a/CHANGES.txt b/CHANGES.txt index e1e80ab828b..e48a0084f86 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -166,6 +166,8 @@ Trunk (unreleased changes) the io package and makes it available to other users (MAPREDUCE-318). (Jothi Padmanabhan via ddas) + HADOOP-6165. Add metadata to Serializations. (tomwhite) + IMPROVEMENTS HADOOP-4565. Added CombineFileInputFormat to use data locality information diff --git a/src/java/core-default.xml b/src/java/core-default.xml index 90e3e711593..6361636851f 100644 --- a/src/java/core-default.xml +++ b/src/java/core-default.xml @@ -101,7 +101,7 @@ io.serializations - org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization + org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization,org.apache.hadoop.io.serializer.avro.AvroGenericSerialization A list of serialization classes that can be used for obtaining serializers and deserializers. diff --git a/src/java/org/apache/hadoop/io/DefaultStringifier.java b/src/java/org/apache/hadoop/io/DefaultStringifier.java index 124a550942d..ff606ff3eb3 100644 --- a/src/java/org/apache/hadoop/io/DefaultStringifier.java +++ b/src/java/org/apache/hadoop/io/DefaultStringifier.java @@ -21,20 +21,21 @@ package org.apache.hadoop.io; import java.io.IOException; import java.nio.charset.UnsupportedCharsetException; import java.util.ArrayList; +import java.util.Map; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.DeserializerBase; +import org.apache.hadoop.io.serializer.SerializationBase; import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.io.serializer.SerializerBase; import org.apache.hadoop.util.GenericsUtil; /** * DefaultStringifier is the default implementation of the {@link Stringifier} * interface which stringifies the objects using base64 encoding of the - * serialized version of the objects. The {@link Serializer} and - * {@link Deserializer} are obtained from the {@link SerializationFactory}. + * serialized version of the objects. The {@link SerializerBase} and + * {@link DeserializerBase} are obtained from the {@link SerializationFactory}. *
* DefaultStringifier offers convenience methods to store/load objects to/from * the configuration. @@ -45,9 +46,9 @@ public class DefaultStringifier implements Stringifier { private static final String SEPARATOR = ","; - private Serializer serializer; + private SerializerBase serializer; - private Deserializer deserializer; + private DeserializerBase deserializer; private DataInputBuffer inBuf; @@ -56,8 +57,9 @@ public class DefaultStringifier implements Stringifier { public DefaultStringifier(Configuration conf, Class c) { SerializationFactory factory = new SerializationFactory(conf); - this.serializer = factory.getSerializer(c); - this.deserializer = factory.getDeserializer(c); + Map metadata = SerializationBase.getMetadataFromClass(c); + this.serializer = factory.getSerializer(metadata); + this.deserializer = factory.getDeserializer(metadata); this.inBuf = new DataInputBuffer(); this.outBuf = new DataOutputBuffer(); try { @@ -102,7 +104,7 @@ public class DefaultStringifier implements Stringifier { * @param item the object to be stored * @param keyName the name of the key to use * @throws IOException : forwards Exceptions from the underlying - * {@link Serialization} classes. + * {@link SerializationBase} classes. */ public static void store(Configuration conf, K item, String keyName) throws IOException { @@ -122,7 +124,7 @@ public class DefaultStringifier implements Stringifier { * @param itemClass the class of the item * @return restored object * @throws IOException : forwards Exceptions from the underlying - * {@link Serialization} classes. + * {@link SerializationBase} classes. */ public static K load(Configuration conf, String keyName, Class itemClass) throws IOException { @@ -145,7 +147,7 @@ public class DefaultStringifier implements Stringifier { * @param keyName the name of the key to use * @throws IndexOutOfBoundsException if the items array is empty * @throws IOException : forwards Exceptions from the underlying - * {@link Serialization} classes. + * {@link SerializationBase} classes. */ public static void storeArray(Configuration conf, K[] items, String keyName) throws IOException { @@ -173,7 +175,7 @@ public class DefaultStringifier implements Stringifier { * @param itemClass the class of the item * @return restored object * @throws IOException : forwards Exceptions from the underlying - * {@link Serialization} classes. + * {@link SerializationBase} classes. */ public static K[] loadArray(Configuration conf, String keyName, Class itemClass) throws IOException { diff --git a/src/java/org/apache/hadoop/io/SequenceFile.java b/src/java/org/apache/hadoop/io/SequenceFile.java index 7e653e0f4cc..ae494f948ed 100644 --- a/src/java/org/apache/hadoop/io/SequenceFile.java +++ b/src/java/org/apache/hadoop/io/SequenceFile.java @@ -33,9 +33,10 @@ import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.zlib.ZlibFactory; -import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.DeserializerBase; +import org.apache.hadoop.io.serializer.SerializationBase; +import org.apache.hadoop.io.serializer.SerializerBase; import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.io.serializer.Serializer; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progress; @@ -705,6 +706,14 @@ public class SequenceFile { return new TreeMap(this.theMetadata); } + public Map getMetadataAsStringMap() { + Map map = new HashMap(); + for (Map.Entry entry : theMetadata.entrySet()) { + map.put(entry.getKey().toString(), entry.getValue().toString()); + } + return map; + } + public void write(DataOutput out) throws IOException { out.writeInt(this.theMetadata.size()); Iterator> iter = @@ -801,9 +810,9 @@ public class SequenceFile { Metadata metadata = null; Compressor compressor = null; - protected Serializer keySerializer; - protected Serializer uncompressedValSerializer; - protected Serializer compressedValSerializer; + protected SerializerBase keySerializer; + protected SerializerBase uncompressedValSerializer; + protected SerializerBase compressedValSerializer; // Insert a globally unique 16-byte value every few entries, so that one // can seek into the middle of a file and then synchronize with record @@ -914,9 +923,10 @@ public class SequenceFile { this.codec = codec; this.metadata = metadata; SerializationFactory serializationFactory = new SerializationFactory(conf); - this.keySerializer = serializationFactory.getSerializer(keyClass); + this.keySerializer = getSerializer(serializationFactory, keyClass, metadata); this.keySerializer.open(buffer); - this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); + this.uncompressedValSerializer = getSerializer(serializationFactory, + valClass, metadata); this.uncompressedValSerializer.open(buffer); if (this.codec != null) { ReflectionUtils.setConf(this.codec, this.conf); @@ -924,11 +934,20 @@ public class SequenceFile { this.deflateFilter = this.codec.createOutputStream(buffer, compressor); this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter)); - this.compressedValSerializer = serializationFactory.getSerializer(valClass); + this.compressedValSerializer = getSerializer(serializationFactory, + valClass, metadata); this.compressedValSerializer.open(deflateOut); } } + @SuppressWarnings("unchecked") + private SerializerBase getSerializer(SerializationFactory sf, Class c, + Metadata metadata) { + Map stringMetadata = metadata.getMetadataAsStringMap(); + stringMetadata.put(SerializationBase.CLASS_KEY, c.getName()); + return sf.getSerializer(stringMetadata); + } + /** Returns the class of keys in this file. */ public Class getKeyClass() { return keyClass; } @@ -1412,8 +1431,8 @@ public class SequenceFile { private DataInputStream valIn = null; private Decompressor valDecompressor = null; - private Deserializer keyDeserializer; - private Deserializer valDeserializer; + private DeserializerBase keyDeserializer; + private DeserializerBase valDeserializer; /** Open the named file. */ public Reader(FileSystem fs, Path file, Configuration conf) @@ -1563,21 +1582,24 @@ public class SequenceFile { SerializationFactory serializationFactory = new SerializationFactory(conf); this.keyDeserializer = - getDeserializer(serializationFactory, getKeyClass()); + getDeserializer(serializationFactory, getKeyClass(), metadata); if (!blockCompressed) { this.keyDeserializer.open(valBuffer); } else { this.keyDeserializer.open(keyIn); } this.valDeserializer = - getDeserializer(serializationFactory, getValueClass()); + getDeserializer(serializationFactory, getValueClass(), metadata); this.valDeserializer.open(valIn); } } @SuppressWarnings("unchecked") - private Deserializer getDeserializer(SerializationFactory sf, Class c) { - return sf.getDeserializer(c); + private DeserializerBase getDeserializer(SerializationFactory sf, Class c, + Metadata metadata) { + Map stringMetadata = metadata.getMetadataAsStringMap(); + stringMetadata.put(SerializationBase.CLASS_KEY, c.getName()); + return sf.getDeserializer(stringMetadata); } /** Close the file. */ diff --git a/src/java/org/apache/hadoop/io/serializer/Deserializer.java b/src/java/org/apache/hadoop/io/serializer/Deserializer.java index 1234a57b2b4..c2389b4c9e9 100644 --- a/src/java/org/apache/hadoop/io/serializer/Deserializer.java +++ b/src/java/org/apache/hadoop/io/serializer/Deserializer.java @@ -34,6 +34,7 @@ import java.io.InputStream; *

* @param */ +@Deprecated public interface Deserializer { /** *

Prepare the deserializer for reading.

diff --git a/src/java/org/apache/hadoop/io/serializer/DeserializerBase.java b/src/java/org/apache/hadoop/io/serializer/DeserializerBase.java new file mode 100644 index 00000000000..90bbc0e1aa3 --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/DeserializerBase.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.serializer; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configured; + +public abstract class DeserializerBase extends Configured + implements Closeable, Deserializer { + + /** + *

Prepare the deserializer for reading.

+ */ + public abstract void open(InputStream in) throws IOException; + + /** + *

+ * Deserialize the next object from the underlying input stream. + * If the object t is non-null then this deserializer + * may set its internal state to the next object read from the input + * stream. Otherwise, if the object t is null a new + * deserialized object will be created. + *

+ * @return the deserialized object + */ + public abstract T deserialize(T t) throws IOException; + +} diff --git a/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java b/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java index 70e8b689e9c..2b1980e995e 100644 --- a/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java +++ b/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java @@ -52,6 +52,13 @@ public abstract class DeserializerComparator implements RawComparator { this.deserializer.open(buffer); } + protected DeserializerComparator(DeserializerBase deserializer) + throws IOException { + + this.deserializer = deserializer; + this.deserializer.open(buffer); + } + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { diff --git a/src/java/org/apache/hadoop/io/serializer/LegacyDeserializer.java b/src/java/org/apache/hadoop/io/serializer/LegacyDeserializer.java new file mode 100644 index 00000000000..8e5036cc7b7 --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/LegacyDeserializer.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.serializer; + +import java.io.IOException; +import java.io.InputStream; + +@SuppressWarnings("deprecation") +class LegacyDeserializer extends DeserializerBase { + + private Deserializer deserializer; + + public LegacyDeserializer(Deserializer deserializer) { + this.deserializer = deserializer; + } + + @Override + public void open(InputStream in) throws IOException { + deserializer.open(in); + } + + @Override + public T deserialize(T t) throws IOException { + return deserializer.deserialize(t); + } + + @Override + public void close() throws IOException { + deserializer.close(); + } + +} diff --git a/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java b/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java new file mode 100644 index 00000000000..d5f0f9a8363 --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer; + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; + +/** + *

+ * Wraps a legacy {@link Serialization} as a {@link SerializationBase}. + *

+ * + * @param + */ +@SuppressWarnings("deprecation") +class LegacySerialization extends SerializationBase { + + private Serialization serialization; + + public LegacySerialization(Serialization serialization, + Configuration conf) { + this.serialization = serialization; + setConf(conf); + } + + Serialization getUnderlyingSerialization() { + return serialization; + } + + @Deprecated + @Override + public boolean accept(Class c) { + return serialization.accept(c); + } + + @Deprecated + @Override + public Deserializer getDeserializer(Class c) { + return serialization.getDeserializer(c); + } + + @Deprecated + @Override + public Serializer getSerializer(Class c) { + return serialization.getSerializer(c); + } + + @Override + public boolean accept(Map metadata) { + Class c = getClassFromMetadata(metadata); + return accept(c); + } + + @SuppressWarnings("unchecked") + @Override + public SerializerBase getSerializer(Map metadata) { + Class c = (Class) getClassFromMetadata(metadata); + return new LegacySerializer(getSerializer(c)); + } + + @SuppressWarnings("unchecked") + @Override + public DeserializerBase getDeserializer(Map metadata) { + Class c = (Class) getClassFromMetadata(metadata); + return new LegacyDeserializer(getDeserializer(c)); + } + +} diff --git a/src/java/org/apache/hadoop/io/serializer/LegacySerializer.java b/src/java/org/apache/hadoop/io/serializer/LegacySerializer.java new file mode 100644 index 00000000000..f623cab1714 --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/LegacySerializer.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.serializer; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +@SuppressWarnings("deprecation") +class LegacySerializer extends SerializerBase { + + private Serializer serializer; + + public LegacySerializer(Serializer serializer) { + this.serializer = serializer; + } + + @Override + public void open(OutputStream out) throws IOException { + serializer.open(out); + } + + @Override + public void serialize(T t) throws IOException { + serializer.serialize(t); + } + + @Override + public void close() throws IOException { + serializer.close(); + } + + @Override + public Map getMetadata() throws IOException { + return Collections.emptyMap(); + } + +} diff --git a/src/java/org/apache/hadoop/io/serializer/Serialization.java b/src/java/org/apache/hadoop/io/serializer/Serialization.java index 6e724bd78b1..ec9681e3c84 100644 --- a/src/java/org/apache/hadoop/io/serializer/Serialization.java +++ b/src/java/org/apache/hadoop/io/serializer/Serialization.java @@ -24,6 +24,7 @@ package org.apache.hadoop.io.serializer; *

* @param */ +@Deprecated public interface Serialization { /** diff --git a/src/java/org/apache/hadoop/io/serializer/SerializationBase.java b/src/java/org/apache/hadoop/io/serializer/SerializationBase.java new file mode 100644 index 00000000000..01df3ef856e --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/SerializationBase.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configured; + +/** + *

+ * Encapsulates a {@link SerializerBase}/{@link DeserializerBase} pair. + *

+ * + * @param + */ +public abstract class SerializationBase extends Configured + implements Serialization { + + public static final String SERIALIZATION_KEY = "Serialization-Class"; + public static final String CLASS_KEY = "Serialized-Class"; + + public static Map getMetadataFromClass(Class c) { + Map metadata = new HashMap(); + metadata.put(CLASS_KEY, c.getName()); + return metadata; + } + + @Deprecated + @Override + public boolean accept(Class c) { + return accept(getMetadataFromClass(c)); + } + + @Deprecated + @Override + public Deserializer getDeserializer(Class c) { + return getDeserializer(getMetadataFromClass(c)); + } + + @Deprecated + @Override + public Serializer getSerializer(Class c) { + return getSerializer(getMetadataFromClass(c)); + } + + /** + * Allows clients to test whether this {@link SerializationBase} supports the + * given metadata. + */ + public abstract boolean accept(Map metadata); + + /** + * @return a {@link SerializerBase} for the given metadata. + */ + public abstract SerializerBase getSerializer(Map metadata); + + /** + * @return a {@link DeserializerBase} for the given metadata. + */ + public abstract DeserializerBase getDeserializer( + Map metadata); + + protected Class getClassFromMetadata(Map metadata) { + String classname = metadata.get(CLASS_KEY); + if (classname == null) { + return null; + } + try { + return getConf().getClassByName(classname); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java b/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java index 4051e097dcc..9dec1537cf0 100644 --- a/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java +++ b/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java @@ -20,11 +20,13 @@ package org.apache.hadoop.io.serializer; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.serializer.avro.AvroGenericSerialization; import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization; import org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization; import org.apache.hadoop.util.ReflectionUtils; @@ -32,7 +34,7 @@ import org.apache.hadoop.util.StringUtils; /** *

- * A factory for {@link Serialization}s. + * A factory for {@link SerializationBase}s. *

*/ public class SerializationFactory extends Configured { @@ -40,7 +42,10 @@ public class SerializationFactory extends Configured { private static final Log LOG = LogFactory.getLog(SerializationFactory.class.getName()); - private List> serializations = new ArrayList>(); + private List> serializations = + new ArrayList>(); + private List> legacySerializations = + new ArrayList>(); /** *

@@ -54,7 +59,8 @@ public class SerializationFactory extends Configured { for (String serializerName : conf.getStrings("io.serializations", new String[]{WritableSerialization.class.getName(), AvroSpecificSerialization.class.getName(), - AvroReflectSerialization.class.getName()})) { + AvroReflectSerialization.class.getName(), + AvroGenericSerialization.class.getName()})) { add(conf, serializerName); } } @@ -62,30 +68,62 @@ public class SerializationFactory extends Configured { @SuppressWarnings("unchecked") private void add(Configuration conf, String serializationName) { try { - - Class serializionClass = - (Class) conf.getClassByName(serializationName); - serializations.add((Serialization) - ReflectionUtils.newInstance(serializionClass, getConf())); + Class serializationClass = conf.getClassByName(serializationName); + if (SerializationBase.class.isAssignableFrom(serializationClass)) { + serializations.add((SerializationBase) + ReflectionUtils.newInstance(serializationClass, getConf())); + } else if (Serialization.class.isAssignableFrom(serializationClass)) { + Serialization serialization = (Serialization) + ReflectionUtils.newInstance(serializationClass, getConf()); + legacySerializations.add(new LegacySerialization(serialization, + getConf())); + } else { + LOG.warn("Serialization class " + serializationName + " is not an " + + "instance of Serialization or BaseSerialization."); + } } catch (ClassNotFoundException e) { - LOG.warn("Serilization class not found: " + + LOG.warn("Serialization class not found: " + StringUtils.stringifyException(e)); } } + @Deprecated public Serializer getSerializer(Class c) { return getSerialization(c).getSerializer(c); } + @Deprecated public Deserializer getDeserializer(Class c) { return getSerialization(c).getDeserializer(c); } - @SuppressWarnings("unchecked") + @Deprecated public Serialization getSerialization(Class c) { - for (Serialization serialization : serializations) { - if (serialization.accept(c)) { - return (Serialization) serialization; + return getSerialization(SerializationBase.getMetadataFromClass(c)); + } + + public SerializerBase getSerializer(Map metadata) { + SerializationBase serialization = getSerialization(metadata); + return serialization.getSerializer(metadata); + } + + public DeserializerBase getDeserializer(Map metadata) { + SerializationBase serialization = getSerialization(metadata); + return serialization.getDeserializer(metadata); + } + + @SuppressWarnings("unchecked") + public SerializationBase getSerialization(Map metadata) { + for (SerializationBase serialization : serializations) { + if (serialization.accept(metadata)) { + return (SerializationBase) serialization; + } + } + // Look in the legacy serializations last, since they ignore + // non-class metadata + for (SerializationBase serialization : legacySerializations) { + if (serialization.accept(metadata)) { + return (SerializationBase) serialization; } } return null; diff --git a/src/java/org/apache/hadoop/io/serializer/Serializer.java b/src/java/org/apache/hadoop/io/serializer/Serializer.java index b3243f5b6b8..35c8868b11c 100644 --- a/src/java/org/apache/hadoop/io/serializer/Serializer.java +++ b/src/java/org/apache/hadoop/io/serializer/Serializer.java @@ -34,6 +34,7 @@ import java.io.OutputStream; *

* @param */ +@Deprecated public interface Serializer { /** *

Prepare the serializer for writing.

diff --git a/src/java/org/apache/hadoop/io/serializer/SerializerBase.java b/src/java/org/apache/hadoop/io/serializer/SerializerBase.java new file mode 100644 index 00000000000..7c2cb347d21 --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/SerializerBase.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.serializer; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +import org.apache.hadoop.conf.Configured; + +public abstract class SerializerBase extends Configured + implements Closeable, Serializer { + + /** + *

Prepare the serializer for writing.

+ */ + public abstract void open(OutputStream out) throws IOException; + + /** + *

Serialize t to the underlying output stream.

+ */ + public abstract void serialize(T t) throws IOException; + + public abstract Map getMetadata() throws IOException; + +} diff --git a/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java b/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java index 47586e8c2dd..04211a185cf 100644 --- a/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java @@ -23,22 +23,20 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; /** - * A {@link Serialization} for {@link Writable}s that delegates to + * A {@link SerializationBase} for {@link Writable}s that delegates to * {@link Writable#write(java.io.DataOutput)} and * {@link Writable#readFields(java.io.DataInput)}. */ -public class WritableSerialization extends Configured - implements Serialization { +public class WritableSerialization extends SerializationBase { - static class WritableDeserializer extends Configured - implements Deserializer { + static class WritableDeserializer extends DeserializerBase { private Class writableClass; private DataInputStream dataIn; @@ -48,6 +46,7 @@ public class WritableSerialization extends Configured this.writableClass = c; } + @Override public void open(InputStream in) { if (in instanceof DataInputStream) { dataIn = (DataInputStream) in; @@ -56,6 +55,7 @@ public class WritableSerialization extends Configured } } + @Override public Writable deserialize(Writable w) throws IOException { Writable writable; if (w == null) { @@ -68,16 +68,23 @@ public class WritableSerialization extends Configured return writable; } + @Override public void close() throws IOException { dataIn.close(); } } - static class WritableSerializer implements Serializer { - + static class WritableSerializer extends SerializerBase { + + private Map metadata; private DataOutputStream dataOut; + public WritableSerializer(Map metadata) { + this.metadata = metadata; + } + + @Override public void open(OutputStream out) { if (out instanceof DataOutputStream) { dataOut = (DataOutputStream) out; @@ -86,26 +93,41 @@ public class WritableSerialization extends Configured } } + @Override public void serialize(Writable w) throws IOException { w.write(dataOut); } + @Override public void close() throws IOException { dataOut.close(); } + @Override + public Map getMetadata() throws IOException { + return metadata; + } + } - public boolean accept(Class c) { - return Writable.class.isAssignableFrom(c); + @Override + public boolean accept(Map metadata) { + if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) { + return true; + } + Class c = getClassFromMetadata(metadata); + return c == null ? false : Writable.class.isAssignableFrom(c); } - public Deserializer getDeserializer(Class c) { + @Override + public SerializerBase getSerializer(Map metadata) { + return new WritableSerializer(metadata); + } + + @Override + public DeserializerBase getDeserializer(Map metadata) { + Class c = getClassFromMetadata(metadata); return new WritableDeserializer(getConf(), c); } - public Serializer getSerializer(Class c) { - return new WritableSerializer(); - } - } diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java new file mode 100644 index 00000000000..2ea4cdd3435 --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer.avro; + +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.hadoop.io.serializer.SerializationBase; + +/** + * Serialization for Avro Generic classes. For a class to be accepted by this + * serialization it must have metadata with key + * {@link SerializationBase#SERIALIZATION_KEY} set to {@link AvroGenericSerialization}'s + * fully-qualified classname. + * The schema used is the one set by {@link AvroSerialization#AVRO_SCHEMA_KEY}. + */ +@SuppressWarnings("unchecked") +public class AvroGenericSerialization extends AvroSerialization { + + @Override + public boolean accept(Map metadata) { + return metadata.get(AVRO_SCHEMA_KEY) != null; + } + + @Override + protected DatumReader getReader(Map metadata) { + Schema schema = Schema.parse(metadata.get(AVRO_SCHEMA_KEY)); + return new GenericDatumReader(schema); + } + + @Override + protected Schema getSchema(Object t, Map metadata) { + String jsonSchema = metadata.get(AVRO_SCHEMA_KEY); + return jsonSchema != null ? Schema.parse(jsonSchema) : GenericData.induce(t); + } + + @Override + protected DatumWriter getWriter(Map metadata) { + return new GenericDatumWriter(); + } + +} diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java index 5805bed4f3c..649c72e8a76 100644 --- a/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java @@ -19,6 +19,7 @@ package org.apache.hadoop.io.serializer.avro; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.avro.Schema; @@ -27,6 +28,7 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificRecord; /** * Serialization for Avro Reflect classes. For a class to be accepted by this @@ -47,10 +49,18 @@ public class AvroReflectSerialization extends AvroSerialization{ private Set packages; - public synchronized boolean accept(Class c) { + @Override + public synchronized boolean accept(Map metadata) { if (packages == null) { getPackages(); } + if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) { + return true; + } + Class c = getClassFromMetadata(metadata); + if (c == null) { + return false; + } return AvroReflectSerializable.class.isAssignableFrom(c) || packages.contains(c.getPackage().getName()); } @@ -65,8 +75,11 @@ public class AvroReflectSerialization extends AvroSerialization{ } } - protected DatumReader getReader(Class clazz) { + @Override + protected DatumReader getReader(Map metadata) { try { + Class clazz = (Class) + getClassFromMetadata(metadata); String prefix = ((clazz.getEnclosingClass() == null || "null".equals(clazz.getEnclosingClass().getName())) ? @@ -78,11 +91,13 @@ public class AvroReflectSerialization extends AvroSerialization{ } } - protected Schema getSchema(Object t) { + @Override + protected Schema getSchema(Object t, Map metadata) { return ReflectData.getSchema(t.getClass()); } - protected DatumWriter getWriter(Class clazz) { + @Override + protected DatumWriter getWriter(Map metadata) { return new ReflectDatumWriter(); } diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java index 54667c98685..fe8f45e57fd 100644 --- a/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java @@ -21,92 +21,105 @@ package org.apache.hadoop.io.serializer.avro; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; -import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.io.serializer.DeserializerBase; +import org.apache.hadoop.io.serializer.SerializationBase; +import org.apache.hadoop.io.serializer.SerializerBase; /** * Base class for providing serialization to Avro types. */ -public abstract class AvroSerialization extends Configured - implements Serialization{ +public abstract class AvroSerialization extends SerializationBase { + + public static final String AVRO_SCHEMA_KEY = "Avro-Schema"; - public Deserializer getDeserializer(Class c) { - return new AvroDeserializer(c); + public DeserializerBase getDeserializer(Map metadata) { + return new AvroDeserializer(metadata); } - public Serializer getSerializer(Class c) { - return new AvroSerializer(c); + public SerializerBase getSerializer(Map metadata) { + return new AvroSerializer(metadata); } /** - * Return an Avro Schema instance for the given class. + * Return an Avro Schema instance for the given class and metadata. */ - protected abstract Schema getSchema(T t); + protected abstract Schema getSchema(T t, Map metadata); /** - * Create and return Avro DatumWriter for the given class. + * Create and return Avro DatumWriter for the given metadata. */ - protected abstract DatumWriter getWriter(Class clazz); + protected abstract DatumWriter getWriter(Map metadata); /** - * Create and return Avro DatumReader for the given class. + * Create and return Avro DatumReader for the given metadata. */ - protected abstract DatumReader getReader(Class clazz); + protected abstract DatumReader getReader(Map metadata); - class AvroSerializer implements Serializer { + class AvroSerializer extends SerializerBase { + private Map metadata; private DatumWriter writer; private BinaryEncoder encoder; private OutputStream outStream; - protected Class clazz; - AvroSerializer(Class clazz) { - writer = getWriter(clazz); + AvroSerializer(Map metadata) { + this.metadata = metadata; + writer = getWriter(metadata); } + @Override public void close() throws IOException { encoder.flush(); outStream.close(); } + @Override public void open(OutputStream out) throws IOException { outStream = out; encoder = new BinaryEncoder(out); } + @Override public void serialize(T t) throws IOException { - writer.setSchema(getSchema(t)); + writer.setSchema(getSchema(t, metadata)); writer.write(t, encoder); } + @Override + public Map getMetadata() throws IOException { + return metadata; + } + } - class AvroDeserializer implements Deserializer { + class AvroDeserializer extends DeserializerBase { private DatumReader reader; private BinaryDecoder decoder; private InputStream inStream; - AvroDeserializer(Class clazz) { - this.reader = getReader(clazz); + AvroDeserializer(Map metadata) { + this.reader = getReader(metadata); } + @Override public void close() throws IOException { inStream.close(); } + @Override public T deserialize(T t) throws IOException { return reader.read(t, decoder); } + @Override public void open(InputStream in) throws IOException { inStream = in; decoder = new BinaryDecoder(in); diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java index a44e402906d..a953e04d4ab 100644 --- a/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java @@ -18,7 +18,10 @@ package org.apache.hadoop.io.serializer.avro; +import java.util.Map; + import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificDatumReader; @@ -33,23 +36,33 @@ import org.apache.avro.specific.SpecificRecord; public class AvroSpecificSerialization extends AvroSerialization{ - public boolean accept(Class c) { - return SpecificRecord.class.isAssignableFrom(c); + @Override + public boolean accept(Map metadata) { + if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) { + return true; + } + Class c = getClassFromMetadata(metadata); + return c == null ? false : SpecificRecord.class.isAssignableFrom(c); } - protected DatumReader getReader(Class clazz) { + @Override + protected DatumReader getReader(Map metadata) { try { + Class clazz = (Class) + getClassFromMetadata(metadata); return new SpecificDatumReader(clazz.newInstance().schema()); } catch (Exception e) { throw new RuntimeException(e); } } - protected Schema getSchema(SpecificRecord t) { + @Override + protected Schema getSchema(SpecificRecord t, Map metadata) { return t.schema(); } - protected DatumWriter getWriter(Class clazz) { + @Override + protected DatumWriter getWriter(Map metadata) { return new SpecificDatumWriter(); } diff --git a/src/java/org/apache/hadoop/util/ReflectionUtils.java b/src/java/org/apache/hadoop/util/ReflectionUtils.java index d1718bf3560..e09a82769ad 100644 --- a/src/java/org/apache/hadoop/util/ReflectionUtils.java +++ b/src/java/org/apache/hadoop/util/ReflectionUtils.java @@ -18,21 +18,27 @@ package org.apache.hadoop.util; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.lang.reflect.Constructor; import java.lang.reflect.Method; -import java.io.*; -import java.lang.management.*; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; -import org.apache.hadoop.conf.*; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.DeserializerBase; +import org.apache.hadoop.io.serializer.SerializationBase; import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.io.serializer.SerializerBase; /** * General reflection utils @@ -269,11 +275,12 @@ public class ReflectionUtils { buffer.outBuffer.reset(); SerializationFactory factory = getFactory(conf); Class cls = (Class) src.getClass(); - Serializer serializer = factory.getSerializer(cls); + Map metadata = SerializationBase.getMetadataFromClass(cls); + SerializerBase serializer = factory.getSerializer(metadata); serializer.open(buffer.outBuffer); serializer.serialize(src); buffer.moveData(); - Deserializer deserializer = factory.getDeserializer(cls); + DeserializerBase deserializer = factory.getDeserializer(metadata); deserializer.open(buffer.inBuffer); dst = deserializer.deserialize(dst); return dst; diff --git a/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java b/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java index 8d47db26cd2..eaa4d466886 100644 --- a/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java +++ b/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.io.serializer; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -33,19 +35,33 @@ public class SerializationTestUtil { * @return deserialized item */ public static K testSerialization(Configuration conf, K before) - throws Exception { - + throws Exception { + Map metadata = + SerializationBase.getMetadataFromClass(GenericsUtil.getClass(before)); + return testSerialization(conf, metadata, before); + } + + /** + * A utility that tests serialization/deserialization. + * @param conf configuration to use, "io.serializations" is read to + * determine the serialization + * @param metadata the metadata to pass to the serializer/deserializer + * @param the class of the item + * @param before item to (de)serialize + * @return deserialized item + */ + public static K testSerialization(Configuration conf, + Map metadata, K before) throws Exception { + SerializationFactory factory = new SerializationFactory(conf); - Serializer serializer - = factory.getSerializer(GenericsUtil.getClass(before)); - Deserializer deserializer - = factory.getDeserializer(GenericsUtil.getClass(before)); - + SerializerBase serializer = factory.getSerializer(metadata); + DeserializerBase deserializer = factory.getDeserializer(metadata); + DataOutputBuffer out = new DataOutputBuffer(); serializer.open(out); serializer.serialize(before); serializer.close(); - + DataInputBuffer in = new DataInputBuffer(); in.reset(out.getData(), out.getLength()); deserializer.open(in); diff --git a/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java b/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java index e65a299fa3c..c3e4a260140 100644 --- a/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java +++ b/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java @@ -18,9 +18,14 @@ package org.apache.hadoop.io.serializer.avro; +import java.util.HashMap; +import java.util.Map; + import junit.framework.TestCase; +import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.serializer.SerializationBase; import org.apache.hadoop.io.serializer.SerializationTestUtil; public class TestAvroSerialization extends TestCase { @@ -59,6 +64,16 @@ public class TestAvroSerialization extends TestCase { SerializationTestUtil.testSerialization(conf, before); assertEquals(before, after); } + + public void testGeneric() throws Exception { + Utf8 before = new Utf8("hadoop"); + Map metadata = new HashMap(); + metadata.put(SerializationBase.SERIALIZATION_KEY, + AvroGenericSerialization.class.getName()); + metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, "\"string\""); + Utf8 after = SerializationTestUtil.testSerialization(conf, metadata, before); + assertEquals(before, after); + } public static class InnerRecord { public int x = 7;