HBASE-5358 HBaseObjectWritable should be able to serialize/deserialize generic arrays

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1242649 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-02-10 03:37:32 +00:00
parent 4aebaffe43
commit c6519ca4c7
2 changed files with 190 additions and 35 deletions

View File

@ -47,17 +47,17 @@ import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@ -82,8 +82,8 @@ import org.apache.hadoop.hbase.filter.SkipFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.util.Bytes;
@ -122,6 +122,10 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
// Special code that means 'not-encoded'; in this case we do old school
// sending of the class name using reflection, etc.
private static final byte NOT_ENCODED = 0;
//Generic array means that the array type is not one of the pre-defined arrays
//in the CLASS_TO_CODE map, but we have to still encode the array since it's
//elements are serializable by this class.
private static final int GENERIC_ARRAY_CODE;
static {
////////////////////////////////////////////////////////////////////////////
// WARNING: Please do not insert, remove or swap any line in this static //
@ -244,10 +248,14 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
addToMap(RegionOpeningState.class, code++);
addToMap(HTableDescriptor[].class, code++);
addToMap(Append.class, code++);
addToMap(RowMutation.class, code++);
//java.lang.reflect.Array is a placeholder for arrays not defined above
GENERIC_ARRAY_CODE = code++;
addToMap(Array.class, GENERIC_ARRAY_CODE);
}
private Class<?> declaredClass;
@ -335,26 +343,33 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
}
}
static Integer getClassCode(final Class<?> c)
throws IOException {
Integer code = CLASS_TO_CODE.get(c);
if (code == null ) {
if (List.class.isAssignableFrom(c)) {
code = CLASS_TO_CODE.get(List.class);
} else if (Writable.class.isAssignableFrom(c)) {
code = CLASS_TO_CODE.get(Writable.class);
} else if (c.isArray()) {
code = CLASS_TO_CODE.get(Array.class);
} else if (Serializable.class.isAssignableFrom(c)){
code = CLASS_TO_CODE.get(Serializable.class);
}
}
return code;
}
/**
* Write out the code byte for passed Class.
* Write out the code for passed Class.
* @param out
* @param c
* @throws IOException
*/
static void writeClassCode(final DataOutput out, final Class<?> c)
throws IOException {
Integer code = CLASS_TO_CODE.get(c);
if (code == null ) {
if ( List.class.isAssignableFrom(c)) {
code = CLASS_TO_CODE.get(List.class);
}
else if (Writable.class.isAssignableFrom(c)) {
code = CLASS_TO_CODE.get(Writable.class);
}
else if (Serializable.class.isAssignableFrom(c)){
code = CLASS_TO_CODE.get(Serializable.class);
}
}
throws IOException {
Integer code = getClassCode(c);
if (code == null) {
LOG.error("Unsupported type " + c);
StackTraceElement[] els = new Exception().getStackTrace();
@ -366,7 +381,6 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
WritableUtils.writeVInt(out, code);
}
public static long getWritableSize(Object instance, Class declaredClass,
Configuration conf) {
long size = Bytes.SIZEOF_BYTE; // code
@ -418,11 +432,18 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
} else if(declClass.equals(Result [].class)) {
Result.writeArray(out, (Result [])instanceObj);
} else {
//if it is a Generic array, write the element's type
if (getClassCode(declaredClass) == GENERIC_ARRAY_CODE) {
Class<?> componentType = declaredClass.getComponentType();
writeClass(out, componentType);
}
int length = Array.getLength(instanceObj);
out.writeInt(length);
for (int i = 0; i < length; i++) {
writeObject(out, Array.get(instanceObj, i),
declClass.getComponentType(), conf);
Object item = Array.get(instanceObj, i);
writeObject(out, item,
item.getClass(), conf);
}
}
} else if (List.class.isAssignableFrom(declClass)) {
@ -495,6 +516,36 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
}
}
/** Writes the encoded class code as defined in CLASS_TO_CODE, or
* the whole class name if not defined in the mapping.
*/
static void writeClass(DataOutput out, Class<?> c) throws IOException {
Integer code = CLASS_TO_CODE.get(c);
if (code == null) {
WritableUtils.writeVInt(out, NOT_ENCODED);
Text.writeString(out, c.getName());
} else {
WritableUtils.writeVInt(out, code);
}
}
/** Reads and returns the class as written by {@link #writeClass(DataOutput, Class)} */
static Class<?> readClass(Configuration conf, DataInput in) throws IOException {
Class<?> instanceClass = null;
int b = (byte)WritableUtils.readVInt(in);
if (b == NOT_ENCODED) {
String className = Text.readString(in);
try {
instanceClass = getClassByName(conf, className);
} catch (ClassNotFoundException e) {
LOG.error("Can't find class " + className, e);
throw new IOException("Can't find class " + className, e);
}
} else {
instanceClass = CODE_TO_CLASS.get(b);
}
return instanceClass;
}
/**
* Read a {@link Writable}, {@link String}, primitive type, or an array of
@ -558,6 +609,13 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
Array.set(instance, i, readObject(in, conf));
}
}
} else if (declaredClass.equals(Array.class)) { //an array not declared in CLASS_TO_CODE
Class<?> componentType = readClass(conf, in);
int length = in.readInt();
instance = Array.newInstance(componentType, length);
for (int i = 0; i < length; i++) {
Array.set(instance, i, readObject(in, conf));
}
} else if (List.class.isAssignableFrom(declaredClass)) { // List
int length = in.readInt();
instance = new ArrayList(length);

View File

@ -19,25 +19,37 @@
*/
package org.apache.hadoop.hbase.io;
import java.io.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparator;
import org.junit.Assert;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
@Category(SmallTests.class)
public class TestHbaseObjectWritable extends TestCase {
@ -57,14 +69,14 @@ public class TestHbaseObjectWritable extends TestCase {
/*
* This is the code used to generate byte[] where
* HbaseObjectWritable used byte for code
*
*
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(byteStream);
HbaseObjectWritable.writeObject(out, bytes, byte[].class, conf);
byte[] ba = byteStream.toByteArray();
out.close();
*/
/*
* byte array generated by the folowing call
* HbaseObjectWritable.writeObject(out, new Text("Old"), Text.class, conf);
@ -73,7 +85,7 @@ public class TestHbaseObjectWritable extends TestCase {
Text txt = (Text)readByteArray(conf, baForText);
Text oldTxt = new Text("Old");
assertEquals(txt, oldTxt);
final byte A = 'A';
byte [] bytes = new byte[1];
bytes[0] = A;
@ -85,7 +97,7 @@ public class TestHbaseObjectWritable extends TestCase {
byte[] baOut = (byte[])readByteArray(conf, baForByteArray);
assertTrue(Bytes.equals(baOut, bytes));
}
/*
* helper method which reads byte array using HbaseObjectWritable.readObject()
*/
@ -98,7 +110,7 @@ public class TestHbaseObjectWritable extends TestCase {
dis.close();
return product;
}
@SuppressWarnings("boxing")
public void testReadObjectDataInputConfiguration() throws IOException {
Configuration conf = HBaseConfiguration.create();
@ -171,7 +183,7 @@ public class TestHbaseObjectWritable extends TestCase {
assertTrue(child instanceof CustomFilter);
assertEquals("mykey", ((CustomFilter)child).getKey());
}
public void testCustomSerializable() throws Exception {
Configuration conf = HBaseConfiguration.create();
@ -197,26 +209,111 @@ public class TestHbaseObjectWritable extends TestCase {
dis.close();
return product;
}
public static class A extends IntWritable {
public A() {}
public A(int a) {super(a);}
}
public static class B extends A {
int b;
public B() { }
public B(int a, int b) {
super(a);
this.b = b;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(b);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
this.b = in.readInt();
}
@Override
public boolean equals(Object o) {
if (o instanceof B) {
return this.get() == ((B) o).get() && this.b == ((B) o).b;
}
return false;
}
}
/** Tests for serialization of List and Arrays */
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testPolymorphismInSequences() throws Exception {
Configuration conf = HBaseConfiguration.create();
Object ret;
//test with lists
List<A> list = Lists.newArrayList(new A(42), new B(10, 100));
ret = doType(conf, list, list.getClass());
assertEquals(ret, list);
//test with Writable[]
Writable[] warr = new Writable[] {new A(42), new B(10, 100)};
ret = doType(conf, warr, warr.getClass());
Assert.assertArrayEquals((Writable[])ret, warr);
//test with arrays
A[] arr = new A[] {new A(42), new B(10, 100)};
ret = doType(conf, arr, arr.getClass());
Assert.assertArrayEquals((A[])ret, arr);
//test with double array
A[][] darr = new A[][] {new A[] { new A(42), new B(10, 100)}, new A[] {new A(12)}};
ret = doType(conf, darr, darr.getClass());
Assert.assertArrayEquals((A[][])ret, darr);
//test with List of arrays
List<A[]> larr = Lists.newArrayList(arr, new A[] {new A(99)});
ret = doType(conf, larr, larr.getClass());
List<A[]> lret = (List<A[]>) ret;
assertEquals(larr.size(), lret.size());
for (int i=0; i<lret.size(); i++) {
Assert.assertArrayEquals(larr.get(i), lret.get(i));
}
//test with array of lists
List[] alarr = new List[] {Lists.newArrayList(new A(1), new A(2)),
Lists.newArrayList(new B(4,5))};
ret = doType(conf, alarr, alarr.getClass());
List[] alret = (List[]) ret;
Assert.assertArrayEquals(alarr, alret);
//test with array of Text, note that Text[] is not pre-defined
Text[] tarr = new Text[] {new Text("foo"), new Text("bar")};
ret = doType(conf, tarr, tarr.getClass());
Assert.assertArrayEquals(tarr, (Text[])ret);
//test with byte[][]
byte[][] barr = new byte[][] {"foo".getBytes(), "baz".getBytes()};
ret = doType(conf, barr, barr.getClass());
Assert.assertArrayEquals(barr, (byte[][])ret);
}
public static class CustomSerializable implements Serializable {
private static final long serialVersionUID = 1048445561865740632L;
private String value = null;
public CustomSerializable() {
}
public CustomSerializable(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class CustomWritable implements Writable {