HADOOP-6698. Revert the io.serialization package to 0.20.2's api. Reverted HADOOP-6165, HADOOP-6443, HADOOP-6323, and HADOOP-6420.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@939412 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
00cb892150
commit
750fb2dbc1
11
CHANGES.txt
11
CHANGES.txt
|
@ -32,9 +32,6 @@ Trunk (unreleased changes)
|
||||||
HADOOP-6392. Run namenode and jobtracker on separate EC2 instances.
|
HADOOP-6392. Run namenode and jobtracker on separate EC2 instances.
|
||||||
(tomwhite)
|
(tomwhite)
|
||||||
|
|
||||||
HADOOP-6323. Add comparators to the serialization API.
|
|
||||||
(Aaron Kimball via cutting)
|
|
||||||
|
|
||||||
HADOOP-6433. Introduce asychronous deletion of files via a pool of
|
HADOOP-6433. Introduce asychronous deletion of files via a pool of
|
||||||
threads. This can be used to delete files in the Distributed
|
threads. This can be used to delete files in the Distributed
|
||||||
Cache. (Zheng Shao via dhruba)
|
Cache. (Zheng Shao via dhruba)
|
||||||
|
@ -129,18 +126,12 @@ Trunk (unreleased changes)
|
||||||
HADOOP-6472. add tokenCache option to GenericOptionsParser for passing
|
HADOOP-6472. add tokenCache option to GenericOptionsParser for passing
|
||||||
file with secret keys to a map reduce job. (boryas)
|
file with secret keys to a map reduce job. (boryas)
|
||||||
|
|
||||||
HADOOP-6443. Serialization classes accept invalid metadata.
|
|
||||||
(Aaron Kimball via tomwhite)
|
|
||||||
|
|
||||||
HADOOP-3205. Read multiple chunks directly from FSInputChecker subclass
|
HADOOP-3205. Read multiple chunks directly from FSInputChecker subclass
|
||||||
into user buffers. (Todd Lipcon via tomwhite)
|
into user buffers. (Todd Lipcon via tomwhite)
|
||||||
|
|
||||||
HADOOP-6479. TestUTF8 assertions could fail with better text.
|
HADOOP-6479. TestUTF8 assertions could fail with better text.
|
||||||
(Steve Loughran via tomwhite)
|
(Steve Loughran via tomwhite)
|
||||||
|
|
||||||
HADOOP-6420. Add functionality permitting subsets of Configuration to be
|
|
||||||
interpreted as Map<String,String>. (Aaron Kimball via cdouglas)
|
|
||||||
|
|
||||||
HADOOP-6155. Deprecate RecordIO anticipating Avro. (Tom White via cdouglas)
|
HADOOP-6155. Deprecate RecordIO anticipating Avro. (Tom White via cdouglas)
|
||||||
|
|
||||||
HADOOP-6492. Make some Avro serialization APIs public.
|
HADOOP-6492. Make some Avro serialization APIs public.
|
||||||
|
@ -564,8 +555,6 @@ Release 0.21.0 - Unreleased
|
||||||
the io package and makes it available to other users (MAPREDUCE-318).
|
the io package and makes it available to other users (MAPREDUCE-318).
|
||||||
(Jothi Padmanabhan via ddas)
|
(Jothi Padmanabhan via ddas)
|
||||||
|
|
||||||
HADOOP-6165. Add metadata to Serializations. (tomwhite)
|
|
||||||
|
|
||||||
HADOOP-6105. Adds support for automatically handling deprecation of
|
HADOOP-6105. Adds support for automatically handling deprecation of
|
||||||
configuration keys. (V.V.Chaitanya Krishna via yhemanth)
|
configuration keys. (V.V.Chaitanya Krishna via yhemanth)
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,7 @@
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>io.serializations</name>
|
<name>io.serializations</name>
|
||||||
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization,org.apache.hadoop.io.serializer.avro.AvroGenericSerialization</value>
|
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
|
||||||
<description>A list of serialization classes that can be used for
|
<description>A list of serialization classes that can be used for
|
||||||
obtaining serializers and deserializers.</description>
|
obtaining serializers and deserializers.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
|
@ -31,7 +31,6 @@ import java.io.OutputStreamWriter;
|
||||||
import java.io.Reader;
|
import java.io.Reader;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.AbstractMap;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -1047,138 +1046,6 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
|
||||||
set(name, StringUtils.arrayToString(values));
|
set(name, StringUtils.arrayToString(values));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Instantiates a map view over a subset of the entries in
|
|
||||||
* the Configuration. This is instantiated by getMap(), which
|
|
||||||
* binds a prefix of the namespace to the ConfigItemMap. This
|
|
||||||
* mapping reflects changes to the underlying Configuration.
|
|
||||||
*
|
|
||||||
* This map does not support iteration.
|
|
||||||
*/
|
|
||||||
protected class ConfigItemMap extends AbstractMap<String, String>
|
|
||||||
implements Map<String, String> {
|
|
||||||
|
|
||||||
private final String prefix;
|
|
||||||
|
|
||||||
public ConfigItemMap(String prefix) {
|
|
||||||
this.prefix = prefix;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean containsKey(Object key) {
|
|
||||||
return lookup(key.toString()) != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<Map.Entry<String, String>> entrySet() {
|
|
||||||
throw new UnsupportedOperationException("unsupported");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
return o != null && o instanceof ConfigItemMap
|
|
||||||
&& prefix.equals(((ConfigItemMap) o).prefix)
|
|
||||||
&& Configuration.this == ((ConfigItemMap) o).getConfiguration();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Configuration getConfiguration() {
|
|
||||||
return Configuration.this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String get(Object key) {
|
|
||||||
if (null == key) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
return lookup(key.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return prefix.hashCode();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String put(String key, String val) {
|
|
||||||
if (null == key) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
String ret = get(key);
|
|
||||||
Configuration.this.set(prefix + key, val);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void putAll(Map<? extends String, ? extends String> m) {
|
|
||||||
for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
|
|
||||||
put(entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String lookup(String subKey) {
|
|
||||||
String configKey = prefix + subKey;
|
|
||||||
Properties props = Configuration.this.getProps();
|
|
||||||
Object val = props.get(configKey);
|
|
||||||
String str = null;
|
|
||||||
if (null != val) {
|
|
||||||
str = substituteVars(val.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
return str;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Given a string -> string map as a value, embed this in the
|
|
||||||
* Configuration by prepending 'name' to all the keys in the valueMap,
|
|
||||||
* and storing it inside the current Configuration.
|
|
||||||
*
|
|
||||||
* e.g., setMap("foo", { "bar" -> "a", "baz" -> "b" }) would
|
|
||||||
* insert "foo.bar" -> "a" and "foo.baz" -> "b" in this
|
|
||||||
* Configuration.
|
|
||||||
*
|
|
||||||
* @param name the prefix to attach to all keys in the valueMap. This
|
|
||||||
* should not have a trailing "." character.
|
|
||||||
* @param valueMap the map to embed in the Configuration.
|
|
||||||
*/
|
|
||||||
public void setMap(String name, Map<String, String> valueMap) {
|
|
||||||
// Store all elements of the map proper.
|
|
||||||
for (Map.Entry<String, String> entry : valueMap.entrySet()) {
|
|
||||||
set(name + "." + entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a map containing a view of all configuration properties
|
|
||||||
* whose names begin with "name.*", with the "name." prefix removed.
|
|
||||||
* e.g., if "foo.bar" -> "a" and "foo.baz" -> "b" are in the
|
|
||||||
* Configuration, getMap("foo") would return { "bar" -> "a",
|
|
||||||
* "baz" -> "b" }.
|
|
||||||
*
|
|
||||||
* Map name deprecation is handled via "prefix deprecation"; the individual
|
|
||||||
* keys created in a configuration by inserting a map do not need to be
|
|
||||||
* individually deprecated -- it is sufficient to deprecate the 'name'
|
|
||||||
* associated with the map and bind that to a new name. e.g., if "foo"
|
|
||||||
* is deprecated for "newfoo," and the configuration contains entries for
|
|
||||||
* "newfoo.a" and "newfoo.b", getMap("foo") will return a map containing
|
|
||||||
* the keys "a" and "b".
|
|
||||||
*
|
|
||||||
* The returned map does not support iteration; it is a lazy view over
|
|
||||||
* the slice of the configuration whose keys begin with 'name'. Updates
|
|
||||||
* to the underlying configuration are reflected in the returned map,
|
|
||||||
* and updates to the map will modify the underlying configuration.
|
|
||||||
*
|
|
||||||
* @param name The prefix of the key names to extract into the output map.
|
|
||||||
* @return a String->String map that contains all (k, v) pairs
|
|
||||||
* where 'k' begins with 'name.'; the 'name.' prefix is removed in the output.
|
|
||||||
*/
|
|
||||||
public Map<String, String> getMap(String name) {
|
|
||||||
String prefix = handleDeprecation(name) + ".";
|
|
||||||
return new ConfigItemMap(prefix);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load a class by name.
|
* Load a class by name.
|
||||||
*
|
*
|
||||||
|
|
|
@ -21,21 +21,20 @@ package org.apache.hadoop.io;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.UnsupportedCharsetException;
|
import java.nio.charset.UnsupportedCharsetException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.serializer.DeserializerBase;
|
import org.apache.hadoop.io.serializer.Deserializer;
|
||||||
import org.apache.hadoop.io.serializer.SerializationBase;
|
import org.apache.hadoop.io.serializer.Serialization;
|
||||||
import org.apache.hadoop.io.serializer.SerializationFactory;
|
import org.apache.hadoop.io.serializer.SerializationFactory;
|
||||||
import org.apache.hadoop.io.serializer.SerializerBase;
|
import org.apache.hadoop.io.serializer.Serializer;
|
||||||
import org.apache.hadoop.util.GenericsUtil;
|
import org.apache.hadoop.util.GenericsUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DefaultStringifier is the default implementation of the {@link Stringifier}
|
* DefaultStringifier is the default implementation of the {@link Stringifier}
|
||||||
* interface which stringifies the objects using base64 encoding of the
|
* interface which stringifies the objects using base64 encoding of the
|
||||||
* serialized version of the objects. The {@link SerializerBase} and
|
* serialized version of the objects. The {@link Serializer} and
|
||||||
* {@link DeserializerBase} are obtained from the {@link SerializationFactory}.
|
* {@link Deserializer} are obtained from the {@link SerializationFactory}.
|
||||||
* <br>
|
* <br>
|
||||||
* DefaultStringifier offers convenience methods to store/load objects to/from
|
* DefaultStringifier offers convenience methods to store/load objects to/from
|
||||||
* the configuration.
|
* the configuration.
|
||||||
|
@ -46,9 +45,9 @@ public class DefaultStringifier<T> implements Stringifier<T> {
|
||||||
|
|
||||||
private static final String SEPARATOR = ",";
|
private static final String SEPARATOR = ",";
|
||||||
|
|
||||||
private SerializerBase<T> serializer;
|
private Serializer<T> serializer;
|
||||||
|
|
||||||
private DeserializerBase<T> deserializer;
|
private Deserializer<T> deserializer;
|
||||||
|
|
||||||
private DataInputBuffer inBuf;
|
private DataInputBuffer inBuf;
|
||||||
|
|
||||||
|
@ -57,9 +56,8 @@ public class DefaultStringifier<T> implements Stringifier<T> {
|
||||||
public DefaultStringifier(Configuration conf, Class<T> c) {
|
public DefaultStringifier(Configuration conf, Class<T> c) {
|
||||||
|
|
||||||
SerializationFactory factory = new SerializationFactory(conf);
|
SerializationFactory factory = new SerializationFactory(conf);
|
||||||
Map<String, String> metadata = SerializationBase.getMetadataFromClass(c);
|
this.serializer = factory.getSerializer(c);
|
||||||
this.serializer = factory.getSerializer(metadata);
|
this.deserializer = factory.getDeserializer(c);
|
||||||
this.deserializer = factory.getDeserializer(metadata);
|
|
||||||
this.inBuf = new DataInputBuffer();
|
this.inBuf = new DataInputBuffer();
|
||||||
this.outBuf = new DataOutputBuffer();
|
this.outBuf = new DataOutputBuffer();
|
||||||
try {
|
try {
|
||||||
|
@ -104,7 +102,7 @@ public class DefaultStringifier<T> implements Stringifier<T> {
|
||||||
* @param item the object to be stored
|
* @param item the object to be stored
|
||||||
* @param keyName the name of the key to use
|
* @param keyName the name of the key to use
|
||||||
* @throws IOException : forwards Exceptions from the underlying
|
* @throws IOException : forwards Exceptions from the underlying
|
||||||
* {@link SerializationBase} classes.
|
* {@link Serialization} classes.
|
||||||
*/
|
*/
|
||||||
public static <K> void store(Configuration conf, K item, String keyName)
|
public static <K> void store(Configuration conf, K item, String keyName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -124,7 +122,7 @@ public class DefaultStringifier<T> implements Stringifier<T> {
|
||||||
* @param itemClass the class of the item
|
* @param itemClass the class of the item
|
||||||
* @return restored object
|
* @return restored object
|
||||||
* @throws IOException : forwards Exceptions from the underlying
|
* @throws IOException : forwards Exceptions from the underlying
|
||||||
* {@link SerializationBase} classes.
|
* {@link Serialization} classes.
|
||||||
*/
|
*/
|
||||||
public static <K> K load(Configuration conf, String keyName,
|
public static <K> K load(Configuration conf, String keyName,
|
||||||
Class<K> itemClass) throws IOException {
|
Class<K> itemClass) throws IOException {
|
||||||
|
@ -147,7 +145,7 @@ public class DefaultStringifier<T> implements Stringifier<T> {
|
||||||
* @param keyName the name of the key to use
|
* @param keyName the name of the key to use
|
||||||
* @throws IndexOutOfBoundsException if the items array is empty
|
* @throws IndexOutOfBoundsException if the items array is empty
|
||||||
* @throws IOException : forwards Exceptions from the underlying
|
* @throws IOException : forwards Exceptions from the underlying
|
||||||
* {@link SerializationBase} classes.
|
* {@link Serialization} classes.
|
||||||
*/
|
*/
|
||||||
public static <K> void storeArray(Configuration conf, K[] items,
|
public static <K> void storeArray(Configuration conf, K[] items,
|
||||||
String keyName) throws IOException {
|
String keyName) throws IOException {
|
||||||
|
@ -175,7 +173,7 @@ public class DefaultStringifier<T> implements Stringifier<T> {
|
||||||
* @param itemClass the class of the item
|
* @param itemClass the class of the item
|
||||||
* @return restored object
|
* @return restored object
|
||||||
* @throws IOException : forwards Exceptions from the underlying
|
* @throws IOException : forwards Exceptions from the underlying
|
||||||
* {@link SerializationBase} classes.
|
* {@link Serialization} classes.
|
||||||
*/
|
*/
|
||||||
public static <K> K[] loadArray(Configuration conf, String keyName,
|
public static <K> K[] loadArray(Configuration conf, String keyName,
|
||||||
Class<K> itemClass) throws IOException {
|
Class<K> itemClass) throws IOException {
|
||||||
|
|
|
@ -33,9 +33,8 @@ import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
|
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
|
||||||
import org.apache.hadoop.io.serializer.DeserializerBase;
|
import org.apache.hadoop.io.serializer.Deserializer;
|
||||||
import org.apache.hadoop.io.serializer.SerializationBase;
|
import org.apache.hadoop.io.serializer.Serializer;
|
||||||
import org.apache.hadoop.io.serializer.SerializerBase;
|
|
||||||
import org.apache.hadoop.io.serializer.SerializationFactory;
|
import org.apache.hadoop.io.serializer.SerializationFactory;
|
||||||
import org.apache.hadoop.conf.*;
|
import org.apache.hadoop.conf.*;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
@ -706,14 +705,6 @@ public class SequenceFile {
|
||||||
return new TreeMap<Text, Text>(this.theMetadata);
|
return new TreeMap<Text, Text>(this.theMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, String> getMetadataAsStringMap() {
|
|
||||||
Map<String, String> map = new HashMap<String, String>();
|
|
||||||
for (Map.Entry<Text, Text> entry : theMetadata.entrySet()) {
|
|
||||||
map.put(entry.getKey().toString(), entry.getValue().toString());
|
|
||||||
}
|
|
||||||
return map;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void write(DataOutput out) throws IOException {
|
public void write(DataOutput out) throws IOException {
|
||||||
out.writeInt(this.theMetadata.size());
|
out.writeInt(this.theMetadata.size());
|
||||||
Iterator<Map.Entry<Text, Text>> iter =
|
Iterator<Map.Entry<Text, Text>> iter =
|
||||||
|
@ -810,9 +801,9 @@ public class SequenceFile {
|
||||||
Metadata metadata = null;
|
Metadata metadata = null;
|
||||||
Compressor compressor = null;
|
Compressor compressor = null;
|
||||||
|
|
||||||
protected SerializerBase keySerializer;
|
protected Serializer keySerializer;
|
||||||
protected SerializerBase uncompressedValSerializer;
|
protected Serializer uncompressedValSerializer;
|
||||||
protected SerializerBase compressedValSerializer;
|
protected Serializer compressedValSerializer;
|
||||||
|
|
||||||
// Insert a globally unique 16-byte value every few entries, so that one
|
// Insert a globally unique 16-byte value every few entries, so that one
|
||||||
// can seek into the middle of a file and then synchronize with record
|
// can seek into the middle of a file and then synchronize with record
|
||||||
|
@ -923,10 +914,9 @@ public class SequenceFile {
|
||||||
this.codec = codec;
|
this.codec = codec;
|
||||||
this.metadata = metadata;
|
this.metadata = metadata;
|
||||||
SerializationFactory serializationFactory = new SerializationFactory(conf);
|
SerializationFactory serializationFactory = new SerializationFactory(conf);
|
||||||
this.keySerializer = getSerializer(serializationFactory, keyClass, metadata);
|
this.keySerializer = serializationFactory.getSerializer(keyClass);
|
||||||
this.keySerializer.open(buffer);
|
this.keySerializer.open(buffer);
|
||||||
this.uncompressedValSerializer = getSerializer(serializationFactory,
|
this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
|
||||||
valClass, metadata);
|
|
||||||
this.uncompressedValSerializer.open(buffer);
|
this.uncompressedValSerializer.open(buffer);
|
||||||
if (this.codec != null) {
|
if (this.codec != null) {
|
||||||
ReflectionUtils.setConf(this.codec, this.conf);
|
ReflectionUtils.setConf(this.codec, this.conf);
|
||||||
|
@ -934,20 +924,11 @@ public class SequenceFile {
|
||||||
this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
|
this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
|
||||||
this.deflateOut =
|
this.deflateOut =
|
||||||
new DataOutputStream(new BufferedOutputStream(deflateFilter));
|
new DataOutputStream(new BufferedOutputStream(deflateFilter));
|
||||||
this.compressedValSerializer = getSerializer(serializationFactory,
|
this.compressedValSerializer = serializationFactory.getSerializer(valClass);
|
||||||
valClass, metadata);
|
|
||||||
this.compressedValSerializer.open(deflateOut);
|
this.compressedValSerializer.open(deflateOut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private SerializerBase getSerializer(SerializationFactory sf, Class c,
|
|
||||||
Metadata metadata) {
|
|
||||||
Map<String, String> stringMetadata = metadata.getMetadataAsStringMap();
|
|
||||||
stringMetadata.put(SerializationBase.CLASS_KEY, c.getName());
|
|
||||||
return sf.getSerializer(stringMetadata);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the class of keys in this file. */
|
/** Returns the class of keys in this file. */
|
||||||
public Class getKeyClass() { return keyClass; }
|
public Class getKeyClass() { return keyClass; }
|
||||||
|
|
||||||
|
@ -1432,8 +1413,8 @@ public class SequenceFile {
|
||||||
private DataInputStream valIn = null;
|
private DataInputStream valIn = null;
|
||||||
private Decompressor valDecompressor = null;
|
private Decompressor valDecompressor = null;
|
||||||
|
|
||||||
private DeserializerBase keyDeserializer;
|
private Deserializer keyDeserializer;
|
||||||
private DeserializerBase valDeserializer;
|
private Deserializer valDeserializer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a reader by opening a file from the given file system.
|
* Construct a reader by opening a file from the given file system.
|
||||||
|
@ -1630,24 +1611,21 @@ public class SequenceFile {
|
||||||
SerializationFactory serializationFactory =
|
SerializationFactory serializationFactory =
|
||||||
new SerializationFactory(conf);
|
new SerializationFactory(conf);
|
||||||
this.keyDeserializer =
|
this.keyDeserializer =
|
||||||
getDeserializer(serializationFactory, getKeyClass(), metadata);
|
getDeserializer(serializationFactory, getKeyClass());
|
||||||
if (!blockCompressed) {
|
if (!blockCompressed) {
|
||||||
this.keyDeserializer.open(valBuffer);
|
this.keyDeserializer.open(valBuffer);
|
||||||
} else {
|
} else {
|
||||||
this.keyDeserializer.open(keyIn);
|
this.keyDeserializer.open(keyIn);
|
||||||
}
|
}
|
||||||
this.valDeserializer =
|
this.valDeserializer =
|
||||||
getDeserializer(serializationFactory, getValueClass(), metadata);
|
getDeserializer(serializationFactory, getValueClass());
|
||||||
this.valDeserializer.open(valIn);
|
this.valDeserializer.open(valIn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private DeserializerBase getDeserializer(SerializationFactory sf, Class c,
|
private Deserializer getDeserializer(SerializationFactory sf, Class c) {
|
||||||
Metadata metadata) {
|
return sf.getDeserializer(c);
|
||||||
Map<String, String> stringMetadata = metadata.getMetadataAsStringMap();
|
|
||||||
stringMetadata.put(SerializationBase.CLASS_KEY, c.getName());
|
|
||||||
return sf.getDeserializer(stringMetadata);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Close the file. */
|
/** Close the file. */
|
||||||
|
|
|
@ -34,7 +34,6 @@ import java.io.InputStream;
|
||||||
* </p>
|
* </p>
|
||||||
* @param <T>
|
* @param <T>
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
|
||||||
public interface Deserializer<T> {
|
public interface Deserializer<T> {
|
||||||
/**
|
/**
|
||||||
* <p>Prepare the deserializer for reading.</p>
|
* <p>Prepare the deserializer for reading.</p>
|
||||||
|
|
|
@ -1,46 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.io.serializer;
|
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configured;
|
|
||||||
|
|
||||||
public abstract class DeserializerBase<T> extends Configured
|
|
||||||
implements Closeable, Deserializer<T> {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>Prepare the deserializer for reading.</p>
|
|
||||||
*/
|
|
||||||
public abstract void open(InputStream in) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>
|
|
||||||
* Deserialize the next object from the underlying input stream.
|
|
||||||
* If the object <code>t</code> is non-null then this deserializer
|
|
||||||
* <i>may</i> set its internal state to the next object read from the input
|
|
||||||
* stream. Otherwise, if the object <code>t</code> is null a new
|
|
||||||
* deserialized object will be created.
|
|
||||||
* </p>
|
|
||||||
* @return the deserialized object
|
|
||||||
*/
|
|
||||||
public abstract T deserialize(T t) throws IOException;
|
|
||||||
|
|
||||||
}
|
|
|
@ -52,13 +52,6 @@ public abstract class DeserializerComparator<T> implements RawComparator<T> {
|
||||||
this.deserializer.open(buffer);
|
this.deserializer.open(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected DeserializerComparator(DeserializerBase<T> deserializer)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
this.deserializer = deserializer;
|
|
||||||
this.deserializer.open(buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
|
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
|
|
@ -34,10 +34,10 @@ import org.apache.hadoop.io.RawComparator;
|
||||||
* </p>
|
* </p>
|
||||||
* @see JavaSerializationComparator
|
* @see JavaSerializationComparator
|
||||||
*/
|
*/
|
||||||
public class JavaSerialization extends SerializationBase<Serializable> {
|
public class JavaSerialization implements Serialization<Serializable> {
|
||||||
|
|
||||||
static class JavaSerializationDeserializer<T extends Serializable>
|
static class JavaSerializationDeserializer<T extends Serializable>
|
||||||
extends DeserializerBase<T> {
|
implements Deserializer<T> {
|
||||||
|
|
||||||
private ObjectInputStream ois;
|
private ObjectInputStream ois;
|
||||||
|
|
||||||
|
@ -65,15 +65,10 @@ public class JavaSerialization extends SerializationBase<Serializable> {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static class JavaSerializationSerializer<T extends Serializable>
|
static class JavaSerializationSerializer
|
||||||
extends SerializerBase<T> {
|
implements Serializer<Serializable> {
|
||||||
|
|
||||||
private ObjectOutputStream oos;
|
private ObjectOutputStream oos;
|
||||||
private Map<String, String> metadata;
|
|
||||||
|
|
||||||
public JavaSerializationSerializer(Map<String, String> metadata) {
|
|
||||||
this.metadata = metadata;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void open(OutputStream out) throws IOException {
|
public void open(OutputStream out) throws IOException {
|
||||||
oos = new ObjectOutputStream(out) {
|
oos = new ObjectOutputStream(out) {
|
||||||
|
@ -83,7 +78,7 @@ public class JavaSerialization extends SerializationBase<Serializable> {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public void serialize(T object) throws IOException {
|
public void serialize(Serializable object) throws IOException {
|
||||||
oos.reset(); // clear (class) back-references
|
oos.reset(); // clear (class) back-references
|
||||||
oos.writeObject(object);
|
oos.writeObject(object);
|
||||||
}
|
}
|
||||||
|
@ -92,53 +87,18 @@ public class JavaSerialization extends SerializationBase<Serializable> {
|
||||||
oos.close();
|
oos.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, String> getMetadata() throws IOException {
|
|
||||||
return metadata;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean accept(Map<String, String> metadata) {
|
public boolean accept(Class<?> c) {
|
||||||
if (!checkSerializationKey(metadata)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
Class<?> c = getClassFromMetadata(metadata);
|
|
||||||
return Serializable.class.isAssignableFrom(c);
|
return Serializable.class.isAssignableFrom(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
public DeserializerBase<Serializable> getDeserializer(
|
public Deserializer<Serializable> getDeserializer(Class<Serializable> c) {
|
||||||
Map<String, String> metadata) {
|
|
||||||
return new JavaSerializationDeserializer<Serializable>();
|
return new JavaSerializationDeserializer<Serializable>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public SerializerBase<Serializable> getSerializer(
|
public Serializer<Serializable> getSerializer(Class<Serializable> c) {
|
||||||
Map<String, String> metadata) {
|
return new JavaSerializationSerializer();
|
||||||
return new JavaSerializationSerializer<Serializable>(metadata);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
|
||||||
public RawComparator<Serializable> getRawComparator(
|
|
||||||
Map<String, String> metadata) {
|
|
||||||
Class<?> klazz = getClassFromMetadata(metadata);
|
|
||||||
if (null == klazz) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"Cannot get comparator without " + SerializationBase.CLASS_KEY
|
|
||||||
+ " set in metadata");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Serializable.class.isAssignableFrom(klazz)) {
|
|
||||||
try {
|
|
||||||
return (RawComparator<Serializable>) new JavaSerializationComparator();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"Could not instantiate JavaSerializationComparator for type "
|
|
||||||
+ klazz.getName(), ioe);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException("Class " + klazz.getName()
|
|
||||||
+ " is incompatible with JavaSerialization");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,47 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.io.serializer;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
class LegacyDeserializer<T> extends DeserializerBase<T> {
|
|
||||||
|
|
||||||
private Deserializer<T> deserializer;
|
|
||||||
|
|
||||||
public LegacyDeserializer(Deserializer<T> deserializer) {
|
|
||||||
this.deserializer = deserializer;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void open(InputStream in) throws IOException {
|
|
||||||
deserializer.open(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T deserialize(T t) throws IOException {
|
|
||||||
return deserializer.deserialize(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
deserializer.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,96 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.io.serializer;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.io.RawComparator;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>
|
|
||||||
* Wraps a legacy {@link Serialization} as a {@link SerializationBase}.
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* @param <T>
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
class LegacySerialization<T> extends SerializationBase<T> {
|
|
||||||
|
|
||||||
private Serialization<T> serialization;
|
|
||||||
|
|
||||||
public LegacySerialization(Serialization<T> serialization,
|
|
||||||
Configuration conf) {
|
|
||||||
this.serialization = serialization;
|
|
||||||
setConf(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
Serialization<T> getUnderlyingSerialization() {
|
|
||||||
return serialization;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Override
|
|
||||||
public boolean accept(Class<?> c) {
|
|
||||||
return serialization.accept(c);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Override
|
|
||||||
public Deserializer<T> getDeserializer(Class<T> c) {
|
|
||||||
return serialization.getDeserializer(c);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Override
|
|
||||||
public Serializer<T> getSerializer(Class<T> c) {
|
|
||||||
return serialization.getSerializer(c);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean accept(Map<String, String> metadata) {
|
|
||||||
Class<?> c = getClassFromMetadata(metadata);
|
|
||||||
return accept(c);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
|
||||||
public SerializerBase<T> getSerializer(Map<String, String> metadata) {
|
|
||||||
Class<T> c = (Class<T>) getClassFromMetadata(metadata);
|
|
||||||
return new LegacySerializer<T>(getSerializer(c));
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
|
||||||
public DeserializerBase<T> getDeserializer(Map<String, String> metadata) {
|
|
||||||
Class<T> c = (Class<T>) getClassFromMetadata(metadata);
|
|
||||||
return new LegacyDeserializer<T>(getDeserializer(c));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RawComparator<T> getRawComparator(Map<String, String> metadata) {
|
|
||||||
// Since this method is being added to an API meant to provide legacy
|
|
||||||
// compatability with deprecated serializers, leaving this as an incomplete
|
|
||||||
// stub.
|
|
||||||
|
|
||||||
throw new UnsupportedOperationException(
|
|
||||||
"LegacySerialization does not provide raw comparators");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,54 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.io.serializer;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
class LegacySerializer<T> extends SerializerBase<T> {
|
|
||||||
|
|
||||||
private Serializer<T> serializer;
|
|
||||||
|
|
||||||
public LegacySerializer(Serializer<T> serializer) {
|
|
||||||
this.serializer = serializer;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void open(OutputStream out) throws IOException {
|
|
||||||
serializer.open(out);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void serialize(T t) throws IOException {
|
|
||||||
serializer.serialize(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
serializer.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, String> getMetadata() throws IOException {
|
|
||||||
return Collections.<String, String>emptyMap();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -24,7 +24,6 @@ package org.apache.hadoop.io.serializer;
|
||||||
* </p>
|
* </p>
|
||||||
* @param <T>
|
* @param <T>
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
|
||||||
public interface Serialization<T> {
|
public interface Serialization<T> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,117 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.io.serializer;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configured;
|
|
||||||
import org.apache.hadoop.io.RawComparator;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>
|
|
||||||
* Encapsulates a {@link SerializerBase}/{@link DeserializerBase} pair.
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* @param <T>
|
|
||||||
*/
|
|
||||||
public abstract class SerializationBase<T> extends Configured
|
|
||||||
implements Serialization<T> {
|
|
||||||
|
|
||||||
public static final String SERIALIZATION_KEY = "Serialization-Class";
|
|
||||||
public static final String CLASS_KEY = "Serialized-Class";
|
|
||||||
|
|
||||||
public static Map<String, String> getMetadataFromClass(Class<?> c) {
|
|
||||||
Map<String, String> metadata = new HashMap<String, String>();
|
|
||||||
metadata.put(CLASS_KEY, c.getName());
|
|
||||||
return metadata;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Override
|
|
||||||
public boolean accept(Class<?> c) {
|
|
||||||
return accept(getMetadataFromClass(c));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Override
|
|
||||||
public Deserializer<T> getDeserializer(Class<T> c) {
|
|
||||||
return getDeserializer(getMetadataFromClass(c));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Override
|
|
||||||
public Serializer<T> getSerializer(Class<T> c) {
|
|
||||||
return getSerializer(getMetadataFromClass(c));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Allows clients to test whether this {@link SerializationBase} supports the
|
|
||||||
* given metadata.
|
|
||||||
*/
|
|
||||||
public abstract boolean accept(Map<String, String> metadata);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return a {@link SerializerBase} for the given metadata.
|
|
||||||
*/
|
|
||||||
public abstract SerializerBase<T> getSerializer(Map<String, String> metadata);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return a {@link DeserializerBase} for the given metadata.
|
|
||||||
*/
|
|
||||||
public abstract DeserializerBase<T> getDeserializer(
|
|
||||||
Map<String, String> metadata);
|
|
||||||
|
|
||||||
public Class<?> getClassFromMetadata(Map<String, String> metadata) {
|
|
||||||
String classname = metadata.get(CLASS_KEY);
|
|
||||||
if (classname == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return getConf().getClassByName(classname);
|
|
||||||
} catch (ClassNotFoundException e) {
|
|
||||||
throw new IllegalArgumentException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Provide a raw comparator for the specified serializable class.
|
|
||||||
* Requires a serialization-specific metadata entry to name the class
|
|
||||||
* to compare (e.g., "Serialized-Class" for JavaSerialization and
|
|
||||||
* WritableSerialization).
|
|
||||||
* @param metadata a set of string mappings providing serialization-specific
|
|
||||||
* arguments that parameterize the data being serialized/compared.
|
|
||||||
* @return a {@link RawComparator} for the given metadata.
|
|
||||||
* @throws UnsupportedOperationException if it cannot instantiate a RawComparator
|
|
||||||
* for this given metadata.
|
|
||||||
*/
|
|
||||||
public abstract RawComparator<T> getRawComparator(Map<String,String> metadata);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check that the SERIALIZATION_KEY, if set, matches the current class.
|
|
||||||
* @param metadata the serialization metadata to check.
|
|
||||||
* @return true if SERIALIZATION_KEY is unset, or if it matches the current class
|
|
||||||
* (meaning that accept() should continue processing), or false if it is a mismatch,
|
|
||||||
* meaning that accept() should return false.
|
|
||||||
*/
|
|
||||||
protected boolean checkSerializationKey(Map<String, String> metadata) {
|
|
||||||
String intendedSerializer = metadata.get(SERIALIZATION_KEY);
|
|
||||||
return intendedSerializer == null ||
|
|
||||||
getClass().getName().equals(intendedSerializer);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -20,13 +20,11 @@ package org.apache.hadoop.io.serializer;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.io.serializer.avro.AvroGenericSerialization;
|
|
||||||
import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
|
import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
|
||||||
import org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization;
|
import org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
@ -34,7 +32,7 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* A factory for {@link SerializationBase}s.
|
* A factory for {@link Serialization}s.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
public class SerializationFactory extends Configured {
|
public class SerializationFactory extends Configured {
|
||||||
|
@ -42,10 +40,7 @@ public class SerializationFactory extends Configured {
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(SerializationFactory.class.getName());
|
LogFactory.getLog(SerializationFactory.class.getName());
|
||||||
|
|
||||||
private List<SerializationBase<?>> serializations =
|
private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();
|
||||||
new ArrayList<SerializationBase<?>>();
|
|
||||||
private List<SerializationBase<?>> legacySerializations =
|
|
||||||
new ArrayList<SerializationBase<?>>();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -59,8 +54,7 @@ public class SerializationFactory extends Configured {
|
||||||
for (String serializerName : conf.getStrings("io.serializations",
|
for (String serializerName : conf.getStrings("io.serializations",
|
||||||
new String[]{WritableSerialization.class.getName(),
|
new String[]{WritableSerialization.class.getName(),
|
||||||
AvroSpecificSerialization.class.getName(),
|
AvroSpecificSerialization.class.getName(),
|
||||||
AvroReflectSerialization.class.getName(),
|
AvroReflectSerialization.class.getName()})) {
|
||||||
AvroGenericSerialization.class.getName()})) {
|
|
||||||
add(conf, serializerName);
|
add(conf, serializerName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,64 +62,32 @@ public class SerializationFactory extends Configured {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private void add(Configuration conf, String serializationName) {
|
private void add(Configuration conf, String serializationName) {
|
||||||
try {
|
try {
|
||||||
Class<?> serializationClass = conf.getClassByName(serializationName);
|
Class<? extends Serialization> serializionClass =
|
||||||
if (SerializationBase.class.isAssignableFrom(serializationClass)) {
|
(Class<? extends Serialization>) conf.getClassByName(serializationName);
|
||||||
serializations.add((SerializationBase)
|
serializations.add((Serialization)
|
||||||
ReflectionUtils.newInstance(serializationClass, getConf()));
|
ReflectionUtils.newInstance(serializionClass, getConf()));
|
||||||
} else if (Serialization.class.isAssignableFrom(serializationClass)) {
|
|
||||||
Serialization serialization = (Serialization)
|
|
||||||
ReflectionUtils.newInstance(serializationClass, getConf());
|
|
||||||
legacySerializations.add(new LegacySerialization(serialization,
|
|
||||||
getConf()));
|
|
||||||
} else {
|
|
||||||
LOG.warn("Serialization class " + serializationName + " is not an " +
|
|
||||||
"instance of Serialization or BaseSerialization.");
|
|
||||||
}
|
|
||||||
} catch (ClassNotFoundException e) {
|
} catch (ClassNotFoundException e) {
|
||||||
LOG.warn("Serialization class not found: " +
|
LOG.warn("Serialization class not found: " +
|
||||||
StringUtils.stringifyException(e));
|
StringUtils.stringifyException(e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public <T> Serializer<T> getSerializer(Class<T> c) {
|
public <T> Serializer<T> getSerializer(Class<T> c) {
|
||||||
return getSerialization(c).getSerializer(c);
|
return getSerialization(c).getSerializer(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public <T> Deserializer<T> getDeserializer(Class<T> c) {
|
public <T> Deserializer<T> getDeserializer(Class<T> c) {
|
||||||
return getSerialization(c).getDeserializer(c);
|
return getSerialization(c).getDeserializer(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public <T> Serialization<T> getSerialization(Class<T> c) {
|
|
||||||
return getSerialization(SerializationBase.getMetadataFromClass(c));
|
|
||||||
}
|
|
||||||
|
|
||||||
public <T> SerializerBase<T> getSerializer(Map<String, String> metadata) {
|
|
||||||
SerializationBase<T> serialization = getSerialization(metadata);
|
|
||||||
return serialization.getSerializer(metadata);
|
|
||||||
}
|
|
||||||
|
|
||||||
public <T> DeserializerBase<T> getDeserializer(Map<String, String> metadata) {
|
|
||||||
SerializationBase<T> serialization = getSerialization(metadata);
|
|
||||||
return serialization.getDeserializer(metadata);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <T> SerializationBase<T> getSerialization(Map<String, String> metadata) {
|
public <T> Serialization<T> getSerialization(Class<T> c) {
|
||||||
for (SerializationBase serialization : serializations) {
|
for (Serialization serialization : serializations) {
|
||||||
if (serialization.accept(metadata)) {
|
if (serialization.accept(c)) {
|
||||||
return (SerializationBase<T>) serialization;
|
return (Serialization<T>) serialization;
|
||||||
}
|
|
||||||
}
|
|
||||||
// Look in the legacy serializations last, since they ignore
|
|
||||||
// non-class metadata
|
|
||||||
for (SerializationBase serialization : legacySerializations) {
|
|
||||||
if (serialization.accept(metadata)) {
|
|
||||||
return (SerializationBase<T>) serialization;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.io.serializer;
|
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configured;
|
|
||||||
|
|
||||||
public abstract class SerializerBase<T> extends Configured
|
|
||||||
implements Closeable, Serializer<T> {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>Prepare the serializer for writing.</p>
|
|
||||||
*/
|
|
||||||
public abstract void open(OutputStream out) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>Serialize <code>t</code> to the underlying output stream.</p>
|
|
||||||
*/
|
|
||||||
public abstract void serialize(T t) throws IOException;
|
|
||||||
|
|
||||||
public abstract Map<String, String> getMetadata() throws IOException;
|
|
||||||
|
|
||||||
}
|
|
|
@ -26,20 +26,19 @@ import java.io.OutputStream;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.RawComparator;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.WritableComparable;
|
|
||||||
import org.apache.hadoop.io.WritableComparator;
|
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link SerializationBase} for {@link Writable}s that delegates to
|
* A {@link Serialization} for {@link Writable}s that delegates to
|
||||||
* {@link Writable#write(java.io.DataOutput)} and
|
* {@link Writable#write(java.io.DataOutput)} and
|
||||||
* {@link Writable#readFields(java.io.DataInput)}.
|
* {@link Writable#readFields(java.io.DataInput)}.
|
||||||
*/
|
*/
|
||||||
public class WritableSerialization extends SerializationBase<Writable> {
|
public class WritableSerialization extends Configured
|
||||||
static class WritableDeserializer extends DeserializerBase<Writable> {
|
implements Serialization<Writable> {
|
||||||
|
static class WritableDeserializer extends Configured
|
||||||
|
implements Deserializer<Writable> {
|
||||||
|
|
||||||
private Class<?> writableClass;
|
private Class<?> writableClass;
|
||||||
private DataInputStream dataIn;
|
private DataInputStream dataIn;
|
||||||
|
@ -78,30 +77,10 @@ public class WritableSerialization extends SerializationBase<Writable> {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static class WritableSerializer extends SerializerBase<Writable> {
|
static class WritableSerializer extends Configured implements
|
||||||
|
Serializer<Writable> {
|
||||||
|
|
||||||
private Map<String, String> metadata;
|
|
||||||
private DataOutputStream dataOut;
|
private DataOutputStream dataOut;
|
||||||
private Class<?> serializedClass;
|
|
||||||
|
|
||||||
public WritableSerializer(Configuration conf,
|
|
||||||
Map<String, String> metadata) {
|
|
||||||
this.metadata = metadata;
|
|
||||||
|
|
||||||
// If this metadata specifies a serialized class, memoize the
|
|
||||||
// class object for this.
|
|
||||||
String className = this.metadata.get(CLASS_KEY);
|
|
||||||
if (null != className) {
|
|
||||||
try {
|
|
||||||
this.serializedClass = conf.getClassByName(className);
|
|
||||||
} catch (ClassNotFoundException cnfe) {
|
|
||||||
throw new RuntimeException(cnfe);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new UnsupportedOperationException("the "
|
|
||||||
+ CLASS_KEY + " metadata is missing, but is required.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(OutputStream out) {
|
public void open(OutputStream out) {
|
||||||
|
@ -114,10 +93,6 @@ public class WritableSerialization extends SerializationBase<Writable> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serialize(Writable w) throws IOException {
|
public void serialize(Writable w) throws IOException {
|
||||||
if (serializedClass != w.getClass()) {
|
|
||||||
throw new IOException("Type mismatch in serialization: expected "
|
|
||||||
+ serializedClass + "; received " + w.getClass());
|
|
||||||
}
|
|
||||||
w.write(dataOut);
|
w.write(dataOut);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,45 +101,21 @@ public class WritableSerialization extends SerializationBase<Writable> {
|
||||||
dataOut.close();
|
dataOut.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, String> getMetadata() throws IOException {
|
|
||||||
return metadata;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(Map<String, String> metadata) {
|
public boolean accept(Class<?> c) {
|
||||||
if (!checkSerializationKey(metadata)) {
|
return Writable.class.isAssignableFrom(c);
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
Class<?> c = getClassFromMetadata(metadata);
|
|
||||||
return c == null ? false : Writable.class.isAssignableFrom(c);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SerializerBase<Writable> getSerializer(Map<String, String> metadata) {
|
public Serializer<Writable> getSerializer(Class<Writable> c) {
|
||||||
return new WritableSerializer(getConf(), metadata);
|
return new WritableSerializer();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DeserializerBase<Writable> getDeserializer(Map<String, String> metadata) {
|
public Deserializer<Writable> getDeserializer(Class<Writable> c) {
|
||||||
Class<?> c = getClassFromMetadata(metadata);
|
|
||||||
return new WritableDeserializer(getConf(), c);
|
return new WritableDeserializer(getConf(), c);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public RawComparator<Writable> getRawComparator(Map<String, String> metadata) {
|
|
||||||
Class<?> klazz = getClassFromMetadata(metadata);
|
|
||||||
if (null == klazz) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"Cannot get comparator without " + SerializationBase.CLASS_KEY
|
|
||||||
+ " set in metadata");
|
|
||||||
}
|
|
||||||
|
|
||||||
return (RawComparator) WritableComparator.get(
|
|
||||||
(Class<WritableComparable>)klazz);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,48 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.io.serializer.avro;
|
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.avro.io.BinaryData;
|
|
||||||
import org.apache.hadoop.io.RawComparator;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>
|
|
||||||
* A {@link RawComparator} that uses Avro to extract data from the
|
|
||||||
* source stream and compare their contents without explicit
|
|
||||||
* deserialization.
|
|
||||||
*/
|
|
||||||
public class AvroComparator<T extends Comparable<T>>
|
|
||||||
implements RawComparator<T> {
|
|
||||||
|
|
||||||
private final Schema schema;
|
|
||||||
|
|
||||||
public AvroComparator(final Schema s) {
|
|
||||||
this.schema = s;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
|
|
||||||
return BinaryData.compare(b1, s1, b2, s2, schema);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int compare(T t1, T t2) {
|
|
||||||
return t1.compareTo(t2);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,64 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.io.serializer.avro;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.avro.generic.GenericData;
|
|
||||||
import org.apache.avro.generic.GenericDatumReader;
|
|
||||||
import org.apache.avro.generic.GenericDatumWriter;
|
|
||||||
import org.apache.avro.io.DatumReader;
|
|
||||||
import org.apache.avro.io.DatumWriter;
|
|
||||||
import org.apache.hadoop.io.serializer.SerializationBase;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Serialization for Avro Generic classes. For a class to be accepted by this
|
|
||||||
* serialization it must have a schema specified.
|
|
||||||
* The schema used is the one set by {@link AvroSerialization#AVRO_SCHEMA_KEY}.
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public class AvroGenericSerialization extends AvroSerialization<Object> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean accept(Map<String, String> metadata) {
|
|
||||||
if (!checkSerializationKey(metadata)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return metadata.get(AVRO_SCHEMA_KEY) != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public DatumReader getReader(Map<String, String> metadata) {
|
|
||||||
Schema schema = Schema.parse(metadata.get(AVRO_SCHEMA_KEY));
|
|
||||||
return new GenericDatumReader<Object>(schema);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Schema getSchema(Map<String, String> metadata) {
|
|
||||||
return Schema.parse(metadata.get(AVRO_SCHEMA_KEY));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public DatumWriter getWriter(Map<String, String> metadata) {
|
|
||||||
return new GenericDatumWriter<Object>();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.io.serializer.avro;
|
package org.apache.hadoop.io.serializer.avro;
|
||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
|
@ -28,7 +27,6 @@ import org.apache.avro.io.DatumWriter;
|
||||||
import org.apache.avro.reflect.ReflectData;
|
import org.apache.avro.reflect.ReflectData;
|
||||||
import org.apache.avro.reflect.ReflectDatumReader;
|
import org.apache.avro.reflect.ReflectDatumReader;
|
||||||
import org.apache.avro.reflect.ReflectDatumWriter;
|
import org.apache.avro.reflect.ReflectDatumWriter;
|
||||||
import org.apache.avro.specific.SpecificRecord;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Serialization for Avro Reflect classes. For a class to be accepted by this
|
* Serialization for Avro Reflect classes. For a class to be accepted by this
|
||||||
|
@ -50,17 +48,10 @@ public class AvroReflectSerialization extends AvroSerialization<Object>{
|
||||||
private Set<String> packages;
|
private Set<String> packages;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean accept(Map<String, String> metadata) {
|
public synchronized boolean accept(Class<?> c) {
|
||||||
if (packages == null) {
|
if (packages == null) {
|
||||||
getPackages();
|
getPackages();
|
||||||
}
|
}
|
||||||
if (!checkSerializationKey(metadata)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
Class<?> c = getClassFromMetadata(metadata);
|
|
||||||
if (c == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return AvroReflectSerializable.class.isAssignableFrom(c) ||
|
return AvroReflectSerializable.class.isAssignableFrom(c) ||
|
||||||
packages.contains(c.getPackage().getName());
|
packages.contains(c.getPackage().getName());
|
||||||
}
|
}
|
||||||
|
@ -76,22 +67,21 @@ public class AvroReflectSerialization extends AvroSerialization<Object>{
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatumReader getReader(Map<String, String> metadata) {
|
public DatumReader getReader(Class<Object> clazz) {
|
||||||
try {
|
try {
|
||||||
return new ReflectDatumReader(getClassFromMetadata(metadata));
|
return new ReflectDatumReader(clazz);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Schema getSchema(Map<String, String> metadata) {
|
public Schema getSchema(Object t) {
|
||||||
Class<?> c = getClassFromMetadata(metadata);
|
return ReflectData.get().getSchema(t.getClass());
|
||||||
return ReflectData.get().getSchema(c);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatumWriter getWriter(Map<String, String> metadata) {
|
public DatumWriter getWriter(Class<Object> clazz) {
|
||||||
return new ReflectDatumWriter();
|
return new ReflectDatumWriter();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,62 +21,57 @@ package org.apache.hadoop.io.serializer.avro;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.io.DecoderFactory;
|
|
||||||
import org.apache.avro.io.BinaryDecoder;
|
import org.apache.avro.io.BinaryDecoder;
|
||||||
import org.apache.avro.io.BinaryEncoder;
|
import org.apache.avro.io.BinaryEncoder;
|
||||||
import org.apache.avro.io.DatumReader;
|
import org.apache.avro.io.DatumReader;
|
||||||
import org.apache.avro.io.DatumWriter;
|
import org.apache.avro.io.DatumWriter;
|
||||||
import org.apache.hadoop.io.RawComparator;
|
import org.apache.avro.io.DecoderFactory;
|
||||||
import org.apache.hadoop.io.serializer.DeserializerBase;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.io.serializer.SerializationBase;
|
import org.apache.hadoop.io.serializer.Deserializer;
|
||||||
import org.apache.hadoop.io.serializer.SerializerBase;
|
import org.apache.hadoop.io.serializer.Serialization;
|
||||||
|
import org.apache.hadoop.io.serializer.Serializer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for providing serialization to Avro types.
|
* Base class for providing serialization to Avro types.
|
||||||
*/
|
*/
|
||||||
public abstract class AvroSerialization<T> extends SerializationBase<T> {
|
public abstract class AvroSerialization<T> extends Configured
|
||||||
|
implements Serialization<T>{
|
||||||
|
|
||||||
public static final String AVRO_SCHEMA_KEY = "Avro-Schema";
|
public static final String AVRO_SCHEMA_KEY = "Avro-Schema";
|
||||||
|
|
||||||
public DeserializerBase<T> getDeserializer(Map<String, String> metadata) {
|
public Deserializer<T> getDeserializer(Class<T> c) {
|
||||||
return new AvroDeserializer(metadata);
|
return new AvroDeserializer(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SerializerBase<T> getSerializer(Map<String, String> metadata) {
|
public Serializer<T> getSerializer(Class<T> c) {
|
||||||
return new AvroSerializer(metadata);
|
return new AvroSerializer(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return an Avro Schema instance for the given class and metadata.
|
* Return an Avro Schema instance for the given class.
|
||||||
*/
|
*/
|
||||||
public abstract Schema getSchema(Map<String, String> metadata);
|
public abstract Schema getSchema(T t);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create and return Avro DatumWriter for the given metadata.
|
* Create and return Avro DatumWriter for the given class.
|
||||||
*/
|
*/
|
||||||
public abstract DatumWriter<T> getWriter(Map<String, String> metadata);
|
public abstract DatumWriter<T> getWriter(Class<T> clazz);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create and return Avro DatumReader for the given metadata.
|
* Create and return Avro DatumReader for the given class.
|
||||||
*/
|
*/
|
||||||
public abstract DatumReader<T> getReader(Map<String, String> metadata);
|
public abstract DatumReader<T> getReader(Class<T> clazz);
|
||||||
|
|
||||||
class AvroSerializer extends SerializerBase<T> {
|
class AvroSerializer implements Serializer<T> {
|
||||||
|
|
||||||
private Map<String, String> metadata;
|
|
||||||
private DatumWriter<T> writer;
|
private DatumWriter<T> writer;
|
||||||
private BinaryEncoder encoder;
|
private BinaryEncoder encoder;
|
||||||
private OutputStream outStream;
|
private OutputStream outStream;
|
||||||
private Schema schema;
|
|
||||||
|
|
||||||
AvroSerializer(Map<String, String> metadata) {
|
AvroSerializer(Class<T> clazz) {
|
||||||
this.metadata = metadata;
|
this.writer = getWriter(clazz);
|
||||||
this.writer = getWriter(metadata);
|
|
||||||
this.schema = getSchema(this.metadata);
|
|
||||||
writer.setSchema(this.schema);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -93,24 +88,20 @@ public abstract class AvroSerialization<T> extends SerializationBase<T> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serialize(T t) throws IOException {
|
public void serialize(T t) throws IOException {
|
||||||
|
writer.setSchema(getSchema(t));
|
||||||
writer.write(t, encoder);
|
writer.write(t, encoder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, String> getMetadata() throws IOException {
|
|
||||||
return metadata;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class AvroDeserializer extends DeserializerBase<T> {
|
class AvroDeserializer implements Deserializer<T> {
|
||||||
|
|
||||||
private DatumReader<T> reader;
|
private DatumReader<T> reader;
|
||||||
private BinaryDecoder decoder;
|
private BinaryDecoder decoder;
|
||||||
private InputStream inStream;
|
private InputStream inStream;
|
||||||
|
|
||||||
AvroDeserializer(Map<String, String> metadata) {
|
AvroDeserializer(Class<T> clazz) {
|
||||||
this.reader = getReader(metadata);
|
this.reader = getReader(clazz);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -131,18 +122,4 @@ public abstract class AvroSerialization<T> extends SerializationBase<T> {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
/**
|
|
||||||
* Provides a raw comparator for Avro-encoded serialized data.
|
|
||||||
* Requires that {@link AvroSerialization#AVRO_SCHEMA_KEY} be provided
|
|
||||||
* in the metadata argument.
|
|
||||||
* @param metadata the Avro-serialization-specific parameters being
|
|
||||||
* provided that detail the schema for the data to deserialize and compare.
|
|
||||||
* @return a RawComparator parameterized for the specified Avro schema.
|
|
||||||
*/
|
|
||||||
public RawComparator<T> getRawComparator(Map<String, String> metadata) {
|
|
||||||
Schema schema = getSchema(metadata);
|
|
||||||
return new AvroComparator(schema);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,13 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.io.serializer.avro;
|
package org.apache.hadoop.io.serializer.avro;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericDatumReader;
|
|
||||||
import org.apache.avro.io.DatumReader;
|
import org.apache.avro.io.DatumReader;
|
||||||
import org.apache.avro.io.DatumWriter;
|
import org.apache.avro.io.DatumWriter;
|
||||||
import org.apache.avro.specific.SpecificData;
|
|
||||||
import org.apache.avro.specific.SpecificDatumReader;
|
import org.apache.avro.specific.SpecificDatumReader;
|
||||||
import org.apache.avro.specific.SpecificDatumWriter;
|
import org.apache.avro.specific.SpecificDatumWriter;
|
||||||
import org.apache.avro.specific.SpecificRecord;
|
import org.apache.avro.specific.SpecificRecord;
|
||||||
|
@ -38,31 +34,26 @@ public class AvroSpecificSerialization
|
||||||
extends AvroSerialization<SpecificRecord>{
|
extends AvroSerialization<SpecificRecord>{
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(Map<String, String> metadata) {
|
public boolean accept(Class<?> c) {
|
||||||
if (!checkSerializationKey(metadata)) {
|
return SpecificRecord.class.isAssignableFrom(c);
|
||||||
return false;
|
|
||||||
}
|
|
||||||
Class<?> c = getClassFromMetadata(metadata);
|
|
||||||
return c == null ? false : SpecificRecord.class.isAssignableFrom(c);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatumReader getReader(Map<String, String> metadata) {
|
public DatumReader getReader(Class<SpecificRecord> clazz) {
|
||||||
try {
|
try {
|
||||||
return new SpecificDatumReader(getClassFromMetadata(metadata));
|
return new SpecificDatumReader(clazz.newInstance().getSchema());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Schema getSchema(Map<String, String> metadata) {
|
public Schema getSchema(SpecificRecord t) {
|
||||||
Class<?> c = getClassFromMetadata(metadata);
|
return t.getSchema();
|
||||||
return SpecificData.get().getSchema(c);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatumWriter getWriter(Map<String, String> metadata) {
|
public DatumWriter getWriter(Class<SpecificRecord> clazz) {
|
||||||
return new SpecificDatumWriter();
|
return new SpecificDatumWriter();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,10 +35,9 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.serializer.DeserializerBase;
|
import org.apache.hadoop.io.serializer.Deserializer;
|
||||||
import org.apache.hadoop.io.serializer.SerializationBase;
|
|
||||||
import org.apache.hadoop.io.serializer.SerializationFactory;
|
import org.apache.hadoop.io.serializer.SerializationFactory;
|
||||||
import org.apache.hadoop.io.serializer.SerializerBase;
|
import org.apache.hadoop.io.serializer.Serializer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* General reflection utils
|
* General reflection utils
|
||||||
|
@ -275,12 +274,11 @@ public class ReflectionUtils {
|
||||||
buffer.outBuffer.reset();
|
buffer.outBuffer.reset();
|
||||||
SerializationFactory factory = getFactory(conf);
|
SerializationFactory factory = getFactory(conf);
|
||||||
Class<T> cls = (Class<T>) src.getClass();
|
Class<T> cls = (Class<T>) src.getClass();
|
||||||
Map<String, String> metadata = SerializationBase.getMetadataFromClass(cls);
|
Serializer<T> serializer = factory.getSerializer(cls);
|
||||||
SerializerBase<T> serializer = factory.getSerializer(metadata);
|
|
||||||
serializer.open(buffer.outBuffer);
|
serializer.open(buffer.outBuffer);
|
||||||
serializer.serialize(src);
|
serializer.serialize(src);
|
||||||
buffer.moveData();
|
buffer.moveData();
|
||||||
DeserializerBase<T> deserializer = factory.getDeserializer(metadata);
|
Deserializer<T> deserializer = factory.getDeserializer(cls);
|
||||||
deserializer.open(buffer.inBuffer);
|
deserializer.open(buffer.inBuffer);
|
||||||
dst = deserializer.deserialize(dst);
|
dst = deserializer.deserialize(dst);
|
||||||
return dst;
|
return dst;
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.io.IOException;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -366,49 +365,6 @@ public class TestConfiguration extends TestCase {
|
||||||
assertTrue(fail);
|
assertTrue(fail);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMap() throws IOException {
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
|
|
||||||
// manually create a map in the config; extract
|
|
||||||
// its values as a map object.
|
|
||||||
conf.set("foo.bar", "A");
|
|
||||||
conf.set("foo.baz", "B");
|
|
||||||
assertEquals("A", conf.get("foo.bar"));
|
|
||||||
assertEquals("B", conf.get("foo.baz"));
|
|
||||||
|
|
||||||
Map<String, String> out = conf.getMap("foo");
|
|
||||||
assertEquals("A", out.get("bar"));
|
|
||||||
assertEquals("B", out.get("baz"));
|
|
||||||
|
|
||||||
Map<String, String> in = new HashMap<String, String>();
|
|
||||||
in.put("yak", "123");
|
|
||||||
in.put("bop", "456");
|
|
||||||
conf.setMap("quux", in);
|
|
||||||
|
|
||||||
// Assert that we can extract individual entries in
|
|
||||||
// the nested map ok.
|
|
||||||
assertEquals("123", conf.get("quux.yak"));
|
|
||||||
|
|
||||||
// Assert that we can get the whole map back out again.
|
|
||||||
out = conf.getMap("quux");
|
|
||||||
assertEquals("123", out.get("yak"));
|
|
||||||
assertEquals("456", out.get("bop"));
|
|
||||||
|
|
||||||
// Test that substitution is handled by getMap().
|
|
||||||
conf.set("subparam", "foo");
|
|
||||||
conf.set("mymap.someprop", "AAA${subparam}BBB");
|
|
||||||
out = conf.getMap("mymap");
|
|
||||||
assertEquals("AAAfooBBB", out.get("someprop"));
|
|
||||||
|
|
||||||
// Test deprecation of maps.
|
|
||||||
Configuration.addDeprecation("oldfoo", new String[]{"newfoo"});
|
|
||||||
conf.set("newfoo.a", "A");
|
|
||||||
conf.set("newfoo.b", "B");
|
|
||||||
out = conf.getMap("oldfoo");
|
|
||||||
assertEquals("A", out.get("a"));
|
|
||||||
assertEquals("B", out.get("b"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testPattern() throws IOException {
|
public void testPattern() throws IOException {
|
||||||
out = new BufferedWriter(new FileWriter(CONFIG));
|
out = new BufferedWriter(new FileWriter(CONFIG));
|
||||||
startConfig();
|
startConfig();
|
||||||
|
|
|
@ -17,45 +17,29 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.io.serializer;
|
package org.apache.hadoop.io.serializer;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.util.GenericsUtil;
|
import org.apache.hadoop.util.GenericsUtil;
|
||||||
|
|
||||||
public class SerializationTestUtil {
|
public class SerializationTestUtil {
|
||||||
|
|
||||||
/**
|
|
||||||
* A utility that tests serialization/deserialization.
|
|
||||||
* @param <K> the class of the item
|
|
||||||
* @param conf configuration to use, "io.serializations" is read to
|
|
||||||
* determine the serialization
|
|
||||||
* @param before item to (de)serialize
|
|
||||||
* @return deserialized item
|
|
||||||
*/
|
|
||||||
public static<K> K testSerialization(Configuration conf, K before)
|
|
||||||
throws Exception {
|
|
||||||
Map<String, String> metadata =
|
|
||||||
SerializationBase.getMetadataFromClass(GenericsUtil.getClass(before));
|
|
||||||
return testSerialization(conf, metadata, before);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A utility that tests serialization/deserialization.
|
* A utility that tests serialization/deserialization.
|
||||||
* @param conf configuration to use, "io.serializations" is read to
|
* @param conf configuration to use, "io.serializations" is read to
|
||||||
* determine the serialization
|
* determine the serialization
|
||||||
* @param metadata the metadata to pass to the serializer/deserializer
|
|
||||||
* @param <K> the class of the item
|
* @param <K> the class of the item
|
||||||
* @param before item to (de)serialize
|
* @param before item to (de)serialize
|
||||||
* @return deserialized item
|
* @return deserialized item
|
||||||
*/
|
*/
|
||||||
public static <K> K testSerialization(Configuration conf,
|
public static <K> K testSerialization(Configuration conf, K before)
|
||||||
Map<String, String> metadata, K before) throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
SerializationFactory factory = new SerializationFactory(conf);
|
SerializationFactory factory = new SerializationFactory(conf);
|
||||||
SerializerBase<K> serializer = factory.getSerializer(metadata);
|
Serializer<K> serializer
|
||||||
DeserializerBase<K> deserializer = factory.getDeserializer(metadata);
|
= factory.getSerializer(GenericsUtil.getClass(before));
|
||||||
|
Deserializer<K> deserializer
|
||||||
|
= factory.getDeserializer(GenericsUtil.getClass(before));
|
||||||
|
|
||||||
DataOutputBuffer out = new DataOutputBuffer();
|
DataOutputBuffer out = new DataOutputBuffer();
|
||||||
serializer.open(out);
|
serializer.open(out);
|
||||||
|
|
|
@ -1,175 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.io.serializer;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.io.TestGenericWritable.CONF_TEST_KEY;
|
|
||||||
import static org.apache.hadoop.io.TestGenericWritable.CONF_TEST_VALUE;
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.avro.util.Utf8;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
|
||||||
import org.apache.hadoop.io.RawComparator;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.io.WritableComparable;
|
|
||||||
import org.apache.hadoop.io.serializer.avro.AvroSerialization;
|
|
||||||
import org.apache.hadoop.io.serializer.avro.AvroGenericSerialization;
|
|
||||||
import org.apache.hadoop.util.GenericsUtil;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test the getRawComparator API of the various serialization systems.
|
|
||||||
*/
|
|
||||||
public class TestRawComparators extends TestCase {
|
|
||||||
|
|
||||||
private Configuration conf;
|
|
||||||
|
|
||||||
public void setUp() {
|
|
||||||
conf = new Configuration();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** A WritableComparable that is guaranteed to use the
|
|
||||||
* generic WritableComparator.
|
|
||||||
*/
|
|
||||||
public static class FooWritable implements WritableComparable<FooWritable> {
|
|
||||||
|
|
||||||
public long val;
|
|
||||||
|
|
||||||
public FooWritable() {
|
|
||||||
this.val = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public FooWritable(long v) {
|
|
||||||
this.val = v;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void write(DataOutput out) throws IOException {
|
|
||||||
out.writeLong(val);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void readFields(DataInput in) throws IOException {
|
|
||||||
val = in.readLong();
|
|
||||||
}
|
|
||||||
|
|
||||||
public int compareTo(FooWritable other) {
|
|
||||||
return new Long(val).compareTo(other.val);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void runComparisonTest(Object low, Object high) throws Exception {
|
|
||||||
Map<String, String> metadata =
|
|
||||||
SerializationBase.getMetadataFromClass(GenericsUtil.getClass(low));
|
|
||||||
runComparisonTest(low, high, metadata);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void runComparisonTest(Object low, Object high,
|
|
||||||
Map<String, String> metadata) throws Exception {
|
|
||||||
|
|
||||||
DataOutputBuffer out1 = new DataOutputBuffer();
|
|
||||||
DataOutputBuffer out2 = new DataOutputBuffer();
|
|
||||||
DataInputBuffer in1 = new DataInputBuffer();
|
|
||||||
DataInputBuffer in2 = new DataInputBuffer();
|
|
||||||
|
|
||||||
SerializationFactory factory = new SerializationFactory(conf);
|
|
||||||
|
|
||||||
// Serialize some data to two byte streams.
|
|
||||||
SerializerBase serializer = factory.getSerializer(metadata);
|
|
||||||
assertNotNull("Serializer is null!", serializer);
|
|
||||||
|
|
||||||
serializer.open(out1);
|
|
||||||
serializer.serialize(low);
|
|
||||||
serializer.close();
|
|
||||||
|
|
||||||
serializer.open(out2);
|
|
||||||
serializer.serialize(high);
|
|
||||||
serializer.close();
|
|
||||||
|
|
||||||
// Shift that data into an input buffer.
|
|
||||||
in1.reset(out1.getData(), out1.getLength());
|
|
||||||
in2.reset(out2.getData(), out2.getLength());
|
|
||||||
|
|
||||||
// Get the serialization and then the RawComparator;
|
|
||||||
// use these to compare the data in the input streams and
|
|
||||||
// assert that the low stream (1) is less than the high stream (2).
|
|
||||||
|
|
||||||
SerializationBase serializationBase = factory.getSerialization(metadata);
|
|
||||||
assertNotNull("Null SerializationBase!", serializationBase);
|
|
||||||
|
|
||||||
RawComparator rawComparator = serializationBase.getRawComparator(metadata);
|
|
||||||
assertNotNull("Null raw comparator!", rawComparator);
|
|
||||||
int actual = rawComparator.compare(in1.getData(), 0, in1.getLength(),
|
|
||||||
in2.getData(), 0, in2.getLength());
|
|
||||||
assertTrue("Did not compare FooWritable correctly", actual < 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testBasicWritable() throws Exception {
|
|
||||||
// Test that a WritableComparable can be used with this API
|
|
||||||
// correctly.
|
|
||||||
|
|
||||||
FooWritable low = new FooWritable(10);
|
|
||||||
FooWritable high = new FooWritable(42);
|
|
||||||
|
|
||||||
runComparisonTest(low, high);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testTextWritable() throws Exception {
|
|
||||||
// Test that a Text object (which uses Writable serialization, and
|
|
||||||
// has its own RawComparator implementation) can be used with this
|
|
||||||
// API correctly.
|
|
||||||
|
|
||||||
Text low = new Text("aaa");
|
|
||||||
Text high = new Text("zzz");
|
|
||||||
|
|
||||||
runComparisonTest(low, high);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testAvroComparator() throws Exception {
|
|
||||||
// Test a record created via an Avro schema that doesn't have a fixed
|
|
||||||
// class associated with it.
|
|
||||||
|
|
||||||
Schema s1 = Schema.create(Schema.Type.INT);
|
|
||||||
|
|
||||||
// Create a metadata mapping containing an Avro schema and a request to use
|
|
||||||
// Avro generic serialization.
|
|
||||||
Map<String, String> metadata = new HashMap<String, String>();
|
|
||||||
metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, s1.toString());
|
|
||||||
metadata.put(SerializationBase.SERIALIZATION_KEY,
|
|
||||||
AvroGenericSerialization.class.getName());
|
|
||||||
|
|
||||||
runComparisonTest(new Integer(42), new Integer(123), metadata);
|
|
||||||
|
|
||||||
// Now test it with a string record type.
|
|
||||||
Schema s2 = Schema.create(Schema.Type.STRING);
|
|
||||||
metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, s2.toString());
|
|
||||||
runComparisonTest(new Utf8("baz"), new Utf8("meep"), metadata);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -22,22 +22,10 @@ import static org.apache.hadoop.io.TestGenericWritable.CONF_TEST_KEY;
|
||||||
import static org.apache.hadoop.io.TestGenericWritable.CONF_TEST_VALUE;
|
import static org.apache.hadoop.io.TestGenericWritable.CONF_TEST_VALUE;
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
|
||||||
import org.apache.hadoop.io.RawComparator;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.TestGenericWritable.Foo;
|
|
||||||
import org.apache.hadoop.io.TestGenericWritable.Bar;
|
|
||||||
import org.apache.hadoop.io.TestGenericWritable.Baz;
|
import org.apache.hadoop.io.TestGenericWritable.Baz;
|
||||||
import org.apache.hadoop.io.TestGenericWritable.FooGenericWritable;
|
import org.apache.hadoop.io.TestGenericWritable.FooGenericWritable;
|
||||||
import org.apache.hadoop.io.serializer.DeserializerBase;
|
|
||||||
import org.apache.hadoop.io.serializer.SerializationBase;
|
|
||||||
import org.apache.hadoop.io.serializer.SerializerBase;
|
|
||||||
import org.apache.hadoop.util.GenericsUtil;
|
|
||||||
|
|
||||||
public class TestWritableSerialization extends TestCase {
|
public class TestWritableSerialization extends TestCase {
|
||||||
|
|
||||||
|
@ -49,7 +37,6 @@ public class TestWritableSerialization extends TestCase {
|
||||||
assertEquals(before, after);
|
assertEquals(before, after);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void testWritableConfigurable() throws Exception {
|
public void testWritableConfigurable() throws Exception {
|
||||||
|
|
||||||
//set the configuration parameter
|
//set the configuration parameter
|
||||||
|
@ -65,118 +52,4 @@ public class TestWritableSerialization extends TestCase {
|
||||||
assertEquals(baz, result);
|
assertEquals(baz, result);
|
||||||
assertNotNull(result.getConf());
|
assertNotNull(result.getConf());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public void testIgnoreMisconfiguredMetadata() throws IOException {
|
|
||||||
// If SERIALIZATION_KEY is set, still need class name.
|
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
Map<String, String> metadata = new HashMap<String, String>();
|
|
||||||
metadata.put(SerializationBase.SERIALIZATION_KEY,
|
|
||||||
WritableSerialization.class.getName());
|
|
||||||
SerializationFactory factory = new SerializationFactory(conf);
|
|
||||||
SerializationBase serialization = factory.getSerialization(metadata);
|
|
||||||
assertNull("Got serializer without any class info", serialization);
|
|
||||||
|
|
||||||
metadata.put(SerializationBase.CLASS_KEY,
|
|
||||||
Text.class.getName());
|
|
||||||
serialization = factory.getSerialization(metadata);
|
|
||||||
assertNotNull("Didn't get serialization!", serialization);
|
|
||||||
assertTrue("Wrong serialization class",
|
|
||||||
serialization instanceof WritableSerialization);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public void testReuseSerializer() throws IOException {
|
|
||||||
// Test that we can write multiple objects of the same type
|
|
||||||
// through the same serializer.
|
|
||||||
|
|
||||||
DataOutputBuffer out = new DataOutputBuffer();
|
|
||||||
SerializationFactory factory = new SerializationFactory(
|
|
||||||
new Configuration());
|
|
||||||
|
|
||||||
// Create a few Foo objects and serialize them.
|
|
||||||
Foo foo = new Foo();
|
|
||||||
Foo foo2 = new Foo();
|
|
||||||
Map<String, String> metadata = SerializationBase.getMetadataFromClass(
|
|
||||||
GenericsUtil.getClass(foo));
|
|
||||||
|
|
||||||
SerializerBase fooSerializer = factory.getSerializer(metadata);
|
|
||||||
fooSerializer.open(out);
|
|
||||||
fooSerializer.serialize(foo);
|
|
||||||
fooSerializer.serialize(foo2);
|
|
||||||
fooSerializer.close();
|
|
||||||
|
|
||||||
out.reset();
|
|
||||||
|
|
||||||
// Create a new serializer for Bar objects
|
|
||||||
Bar bar = new Bar();
|
|
||||||
Baz baz = new Baz(); // Baz inherits from Bar.
|
|
||||||
metadata = SerializationBase.getMetadataFromClass(
|
|
||||||
GenericsUtil.getClass(bar));
|
|
||||||
// Check that we can serialize Bar objects.
|
|
||||||
SerializerBase barSerializer = factory.getSerializer(metadata);
|
|
||||||
barSerializer.open(out);
|
|
||||||
barSerializer.serialize(bar); // this should work.
|
|
||||||
try {
|
|
||||||
// This should not work. We should not allow subtype serialization.
|
|
||||||
barSerializer.serialize(baz);
|
|
||||||
fail("Expected IOException serializing baz via bar serializer.");
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
// Expected.
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// This should not work. Disallow unrelated type serialization.
|
|
||||||
barSerializer.serialize(foo);
|
|
||||||
fail("Expected IOException serializing foo via bar serializer.");
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
// Expected.
|
|
||||||
}
|
|
||||||
|
|
||||||
barSerializer.close();
|
|
||||||
out.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Test the SerializationBase.checkSerializationKey() method.
|
|
||||||
class DummySerializationBase extends SerializationBase<Object> {
|
|
||||||
public boolean accept(Map<String, String> metadata) {
|
|
||||||
return checkSerializationKey(metadata);
|
|
||||||
}
|
|
||||||
|
|
||||||
public SerializerBase<Object> getSerializer(Map<String, String> metadata) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public DeserializerBase<Object> getDeserializer(Map<String, String> metadata) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RawComparator<Object> getRawComparator(Map<String, String> metadata) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSerializationKeyCheck() {
|
|
||||||
DummySerializationBase dummy = new DummySerializationBase();
|
|
||||||
Map<String, String> metadata = new HashMap<String, String>();
|
|
||||||
|
|
||||||
assertTrue("Didn't accept empty metadata", dummy.accept(metadata));
|
|
||||||
|
|
||||||
metadata.put(SerializationBase.SERIALIZATION_KEY,
|
|
||||||
DummySerializationBase.class.getName());
|
|
||||||
assertTrue("Didn't accept valid metadata", dummy.accept(metadata));
|
|
||||||
|
|
||||||
metadata.put(SerializationBase.SERIALIZATION_KEY, "foo");
|
|
||||||
assertFalse("Accepted invalid metadata", dummy.accept(metadata));
|
|
||||||
|
|
||||||
try {
|
|
||||||
dummy.accept((Map<String, String>) null);
|
|
||||||
// Shouldn't get here!
|
|
||||||
fail("Somehow didn't actually test the method we expected");
|
|
||||||
} catch (NullPointerException npe) {
|
|
||||||
// expected this.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,46 +18,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.io.serializer.avro;
|
package org.apache.hadoop.io.serializer.avro;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
import org.apache.avro.util.Utf8;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.serializer.SerializationBase;
|
|
||||||
import org.apache.hadoop.io.serializer.SerializationFactory;
|
|
||||||
import org.apache.hadoop.io.serializer.SerializationTestUtil;
|
import org.apache.hadoop.io.serializer.SerializationTestUtil;
|
||||||
|
|
||||||
public class TestAvroSerialization extends TestCase {
|
public class TestAvroSerialization extends TestCase {
|
||||||
|
|
||||||
private static final Configuration conf = new Configuration();
|
private static final Configuration conf = new Configuration();
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public void testIgnoreMisconfiguredMetadata() {
|
|
||||||
// If SERIALIZATION_KEY is set, still need class name.
|
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
Map<String, String> metadata = new HashMap<String, String>();
|
|
||||||
SerializationFactory factory = new SerializationFactory(conf);
|
|
||||||
SerializationBase serialization = null;
|
|
||||||
|
|
||||||
metadata.put(SerializationBase.SERIALIZATION_KEY,
|
|
||||||
AvroGenericSerialization.class.getName());
|
|
||||||
serialization = factory.getSerialization(metadata);
|
|
||||||
assertNull("Got serializer without any class info", serialization);
|
|
||||||
|
|
||||||
metadata.put(SerializationBase.SERIALIZATION_KEY,
|
|
||||||
AvroReflectSerialization.class.getName());
|
|
||||||
serialization = factory.getSerialization(metadata);
|
|
||||||
assertNull("Got serializer without any class info", serialization);
|
|
||||||
|
|
||||||
metadata.put(SerializationBase.SERIALIZATION_KEY,
|
|
||||||
AvroSpecificSerialization.class.getName());
|
|
||||||
serialization = factory.getSerialization(metadata);
|
|
||||||
assertNull("Got serializer without any class info", serialization);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSpecific() throws Exception {
|
public void testSpecific() throws Exception {
|
||||||
AvroRecord before = new AvroRecord();
|
AvroRecord before = new AvroRecord();
|
||||||
before.intField = 5;
|
before.intField = 5;
|
||||||
|
@ -91,16 +60,6 @@ public class TestAvroSerialization extends TestCase {
|
||||||
assertEquals(before, after);
|
assertEquals(before, after);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGeneric() throws Exception {
|
|
||||||
Utf8 before = new Utf8("hadoop");
|
|
||||||
Map<String, String> metadata = new HashMap<String, String>();
|
|
||||||
metadata.put(SerializationBase.SERIALIZATION_KEY,
|
|
||||||
AvroGenericSerialization.class.getName());
|
|
||||||
metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, "\"string\"");
|
|
||||||
Utf8 after = SerializationTestUtil.testSerialization(conf, metadata, before);
|
|
||||||
assertEquals(before, after);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class InnerRecord {
|
public static class InnerRecord {
|
||||||
public int x = 7;
|
public int x = 7;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue