HADOOP-6949. Reduce RPC packet size of primitive arrays using ArrayPrimitiveWritable instead of ObjectWritable. Contributed by Matt Foley.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1083957 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2011-03-21 21:10:06 +00:00
parent 1187d69eed
commit 0d55e1a144
6 changed files with 573 additions and 4 deletions

View File

@ -85,6 +85,9 @@ Trunk (unreleased changes)
(Daryn Sharp via szetszwo)
OPTIMIZATIONS
HADOOP-6949. Reduce RPC packet size of primitive arrays using
ArrayPrimitiveWritable instead of ObjectWritable. (Matt Foley via suresh)
BUG FIXES

View File

@ -0,0 +1,346 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This is a wrapper class. It wraps a Writable implementation around
* an array of primitives (e.g., int[], long[], etc.), with optimized
* wire format, and without creating new objects per element.
*
* This is a wrapper class only; it does not make a copy of the
* underlying array.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ArrayPrimitiveWritable implements Writable {
//componentType is determined from the component type of the value array
//during a "set" operation. It must be primitive.
private Class<?> componentType = null;
//declaredComponentType need not be declared, but if you do (by using the
//ArrayPrimitiveWritable(Class<?>) constructor), it will provide typechecking
//for all "set" operations.
private Class<?> declaredComponentType = null;
private int length;
private Object value; //must be an array of <componentType>[length]
private static final Map<String, Class<?>> PRIMITIVE_NAMES =
new HashMap<String, Class<?>>(16);
static {
PRIMITIVE_NAMES.put(boolean.class.getName(), boolean.class);
PRIMITIVE_NAMES.put(byte.class.getName(), byte.class);
PRIMITIVE_NAMES.put(char.class.getName(), char.class);
PRIMITIVE_NAMES.put(short.class.getName(), short.class);
PRIMITIVE_NAMES.put(int.class.getName(), int.class);
PRIMITIVE_NAMES.put(long.class.getName(), long.class);
PRIMITIVE_NAMES.put(float.class.getName(), float.class);
PRIMITIVE_NAMES.put(double.class.getName(), double.class);
}
private static Class<?> getPrimitiveClass(String className) {
return PRIMITIVE_NAMES.get(className);
}
private static void checkPrimitive(Class<?> componentType) {
if (componentType == null) {
throw new HadoopIllegalArgumentException("null component type not allowed");
}
if (! PRIMITIVE_NAMES.containsKey(componentType.getName())) {
throw new HadoopIllegalArgumentException("input array component type "
+ componentType.getName() + " is not a candidate primitive type");
}
}
private void checkDeclaredComponentType(Class<?> componentType) {
if ((declaredComponentType != null)
&& (componentType != declaredComponentType)) {
throw new HadoopIllegalArgumentException("input array component type "
+ componentType.getName() + " does not match declared type "
+ declaredComponentType.getName());
}
}
private static void checkArray(Object value) {
if (value == null) {
throw new HadoopIllegalArgumentException("null value not allowed");
}
if (! value.getClass().isArray()) {
throw new HadoopIllegalArgumentException("non-array value of class "
+ value.getClass() + " not allowed");
}
}
/**
* Construct an empty instance, for use during Writable read
*/
public ArrayPrimitiveWritable() {
//empty constructor
}
/**
* Construct an instance of known type but no value yet
* for use with type-specific wrapper classes
*/
public ArrayPrimitiveWritable(Class<?> componentType) {
checkPrimitive(componentType);
this.declaredComponentType = componentType;
}
/**
* Wrap an existing array of primitives
* @param value - array of primitives
*/
public ArrayPrimitiveWritable(Object value) {
set(value);
}
/**
* Get the original array.
* Client must cast it back to type componentType[]
* (or may use type-specific wrapper classes).
* @return - original array as Object
*/
public Object get() { return value; }
public Class<?> getComponentType() { return componentType; }
public Class<?> getDeclaredComponentType() { return declaredComponentType; }
public boolean isDeclaredComponentType(Class<?> componentType) {
return componentType == declaredComponentType;
}
public void set(Object value) {
checkArray(value);
Class<?> componentType = value.getClass().getComponentType();
checkPrimitive(componentType);
checkDeclaredComponentType(componentType);
this.componentType = componentType;
this.value = value;
this.length = Array.getLength(value);
}
/**
* Do not use this class.
* This is an internal class, purely for ObjectWritable to use as
* a label class for transparent conversions of arrays of primitives
* during wire protocol reads and writes.
*/
static class Internal extends ArrayPrimitiveWritable {
Internal() { //use for reads
super();
}
Internal(Object value) { //use for writes
super(value);
}
} //end Internal subclass declaration
/*
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
@Override
@SuppressWarnings("deprecation")
public void write(DataOutput out) throws IOException {
// write componentType
UTF8.writeString(out, componentType.getName());
// write length
out.writeInt(length);
// do the inner loop. Walk the decision tree only once.
if (componentType == Boolean.TYPE) { // boolean
writeBooleanArray(out);
} else if (componentType == Character.TYPE) { // char
writeCharArray(out);
} else if (componentType == Byte.TYPE) { // byte
writeByteArray(out);
} else if (componentType == Short.TYPE) { // short
writeShortArray(out);
} else if (componentType == Integer.TYPE) { // int
writeIntArray(out);
} else if (componentType == Long.TYPE) { // long
writeLongArray(out);
} else if (componentType == Float.TYPE) { // float
writeFloatArray(out);
} else if (componentType == Double.TYPE) { // double
writeDoubleArray(out);
} else {
throw new IOException("Component type " + componentType.toString()
+ " is set as the output type, but no encoding is implemented for this type.");
}
}
/*
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
@Override
public void readFields(DataInput in) throws IOException {
// read and set the component type of the array
@SuppressWarnings("deprecation")
String className = UTF8.readString(in);
Class<?> componentType = getPrimitiveClass(className);
if (componentType == null) {
throw new IOException("encoded array component type "
+ className + " is not a candidate primitive type");
}
checkDeclaredComponentType(componentType);
this.componentType = componentType;
// read and set the length of the array
int length = in.readInt();
if (length < 0) {
throw new IOException("encoded array length is negative " + length);
}
this.length = length;
// construct and read in the array
value = Array.newInstance(componentType, length);
// do the inner loop. Walk the decision tree only once.
if (componentType == Boolean.TYPE) { // boolean
readBooleanArray(in);
} else if (componentType == Character.TYPE) { // char
readCharArray(in);
} else if (componentType == Byte.TYPE) { // byte
readByteArray(in);
} else if (componentType == Short.TYPE) { // short
readShortArray(in);
} else if (componentType == Integer.TYPE) { // int
readIntArray(in);
} else if (componentType == Long.TYPE) { // long
readLongArray(in);
} else if (componentType == Float.TYPE) { // float
readFloatArray(in);
} else if (componentType == Double.TYPE) { // double
readDoubleArray(in);
} else {
throw new IOException("Encoded type " + className
+ " converted to valid component type " + componentType.toString()
+ " but no encoding is implemented for this type.");
}
}
//For efficient implementation, there's no way around
//the following massive code duplication.
private void writeBooleanArray(DataOutput out) throws IOException {
boolean[] v = (boolean[]) value;
for (int i = 0; i < length; i++)
out.writeBoolean(v[i]);
}
private void writeCharArray(DataOutput out) throws IOException {
char[] v = (char[]) value;
for (int i = 0; i < length; i++)
out.writeChar(v[i]);
}
private void writeByteArray(DataOutput out) throws IOException {
out.write((byte[]) value, 0, length);
}
private void writeShortArray(DataOutput out) throws IOException {
short[] v = (short[]) value;
for (int i = 0; i < length; i++)
out.writeShort(v[i]);
}
private void writeIntArray(DataOutput out) throws IOException {
int[] v = (int[]) value;
for (int i = 0; i < length; i++)
out.writeInt(v[i]);
}
private void writeLongArray(DataOutput out) throws IOException {
long[] v = (long[]) value;
for (int i = 0; i < length; i++)
out.writeLong(v[i]);
}
private void writeFloatArray(DataOutput out) throws IOException {
float[] v = (float[]) value;
for (int i = 0; i < length; i++)
out.writeFloat(v[i]);
}
private void writeDoubleArray(DataOutput out) throws IOException {
double[] v = (double[]) value;
for (int i = 0; i < length; i++)
out.writeDouble(v[i]);
}
private void readBooleanArray(DataInput in) throws IOException {
boolean[] v = (boolean[]) value;
for (int i = 0; i < length; i++)
v[i] = in.readBoolean();
}
private void readCharArray(DataInput in) throws IOException {
char[] v = (char[]) value;
for (int i = 0; i < length; i++)
v[i] = in.readChar();
}
private void readByteArray(DataInput in) throws IOException {
in.readFully((byte[]) value, 0, length);
}
private void readShortArray(DataInput in) throws IOException {
short[] v = (short[]) value;
for (int i = 0; i < length; i++)
v[i] = in.readShort();
}
private void readIntArray(DataInput in) throws IOException {
int[] v = (int[]) value;
for (int i = 0; i < length; i++)
v[i] = in.readInt();
}
private void readLongArray(DataInput in) throws IOException {
long[] v = (long[]) value;
for (int i = 0; i < length; i++)
v[i] = in.readLong();
}
private void readFloatArray(DataInput in) throws IOException {
float[] v = (float[]) value;
for (int i = 0; i < length; i++)
v[i] = in.readFloat();
}
private void readDoubleArray(DataInput in) throws IOException {
double[] v = (double[]) value;
for (int i = 0; i < length; i++)
v[i] = in.readDouble();
}
}

View File

@ -115,22 +115,51 @@ public void write(DataOutput out) throws IOException {
public static void writeObject(DataOutput out, Object instance,
Class declaredClass,
Configuration conf) throws IOException {
writeObject(out, instance, declaredClass, conf, false);
}
/**
* Write a {@link Writable}, {@link String}, primitive type, or an array of
* the preceding.
*
* @param allowCompactArrays - set true for RPC and internal or intra-cluster
* usages. Set false for inter-cluster, File, and other persisted output
* usages, to preserve the ability to interchange files with other clusters
* that may not be running the same version of software. Sometime in ~2013
* we can consider removing this parameter and always using the compact format.
*/
public static void writeObject(DataOutput out, Object instance,
Class declaredClass, Configuration conf, boolean allowCompactArrays)
throws IOException {
if (instance == null) { // null
instance = new NullInstance(declaredClass, conf);
declaredClass = Writable.class;
}
// Special case: must come before writing out the declaredClass.
// If this is an eligible array of primitives,
// wrap it in an ArrayPrimitiveWritable$Internal wrapper class.
if (allowCompactArrays && declaredClass.isArray()
&& instance.getClass().getName().equals(declaredClass.getName())
&& instance.getClass().getComponentType().isPrimitive()) {
instance = new ArrayPrimitiveWritable.Internal(instance);
declaredClass = ArrayPrimitiveWritable.Internal.class;
}
UTF8.writeString(out, declaredClass.getName()); // always write declared
if (declaredClass.isArray()) { // array
if (declaredClass.isArray()) { // non-primitive or non-compact array
int length = Array.getLength(instance);
out.writeInt(length);
for (int i = 0; i < length; i++) {
writeObject(out, Array.get(instance, i),
declaredClass.getComponentType(), conf);
declaredClass.getComponentType(), conf, allowCompactArrays);
}
} else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
((ArrayPrimitiveWritable.Internal) instance).write(out);
} else if (declaredClass == String.class) { // String
UTF8.writeString(out, (String)instance);
@ -219,6 +248,15 @@ public static Object readObject(DataInput in, ObjectWritable objectWritable, Con
Array.set(instance, i, readObject(in, conf));
}
} else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
// Read and unwrap ArrayPrimitiveWritable$Internal array.
// Always allow the read, even if write is disabled by allowCompactArrays.
ArrayPrimitiveWritable.Internal temp =
new ArrayPrimitiveWritable.Internal();
temp.readFields(in);
instance = temp.get();
declaredClass = instance.getClass();
} else if (declaredClass == String.class) { // String
instance = UTF8.readString(in);
} else if (declaredClass.isEnum()) { // enum

View File

@ -105,7 +105,9 @@ public abstract class Server {
// 1 : Introduce ping and server does not throw away RPCs
// 3 : Introduce the protocol into the RPC connection header
// 4 : Introduced SASL security layer
public static final byte CURRENT_VERSION = 4;
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
// in ObjectWritable to efficiently transmit arrays of primitives
public static final byte CURRENT_VERSION = 5;
/**
* Initial and max size of response buffer

View File

@ -87,7 +87,7 @@ public void write(DataOutput out) throws IOException {
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
conf);
conf, true);
}
}

View File

@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.*;
import java.util.Arrays;
import org.apache.hadoop.util.StringUtils;
import org.junit.*;
import junit.framework.TestCase;
/** Unit tests for {@link ArrayPrimitiveWritable} */
public class TestArrayPrimitiveWritable extends TestCase {
static final boolean[] b = {true, true, false};
static final char[] c = {'a', 'b', 'c'};
static final byte[] by = {1, 2, 3};
static final short[] sh = {1, 2, 3};
static final int[] i = {1, 2, 3};
static final long[] lo = {1L, 2L, 3L};
static final float[] f = {(float) 1.0, (float) 2.5, (float) 3.3};
static final double[] d = {1.0, 2.5, 3.3};
static final Object[] bigSet = {b, c, by, sh, i, lo, f, d};
static final Object[] expectedResultSet = {b, b, c, c, by, by, sh, sh,
i, i, lo, lo, f, f, d, d};
final Object[] resultSet = new Object[bigSet.length * 2];
final DataOutputBuffer out = new DataOutputBuffer();
final DataInputBuffer in = new DataInputBuffer();
@Before
public void resetBuffers() throws IOException {
out.reset();
in.reset();
}
@Test
public void testMany() throws IOException {
//Write a big set of data, one of each primitive type array
for (Object x : bigSet) {
//write each test object two ways
//First, transparently via ObjectWritable
ObjectWritable.writeObject(out, x, x.getClass(), null, true);
//Second, explicitly via ArrayPrimitiveWritable
(new ArrayPrimitiveWritable(x)).write(out);
}
//Now read the data back in
in.reset(out.getData(), out.getLength());
for (int x = 0; x < resultSet.length; ) {
//First, transparently
resultSet[x++] = ObjectWritable.readObject(in, null);
//Second, explicitly
ArrayPrimitiveWritable apw = new ArrayPrimitiveWritable();
apw.readFields(in);
resultSet[x++] = apw.get();
}
//validate data structures and values
assertEquals(expectedResultSet.length, resultSet.length);
for (int x = 0; x < resultSet.length; x++) {
assertEquals("ComponentType of array " + x,
expectedResultSet[x].getClass().getComponentType(),
resultSet[x].getClass().getComponentType());
}
assertTrue("In and Out arrays didn't match values",
Arrays.deepEquals(expectedResultSet, resultSet));
}
@Test
@SuppressWarnings("deprecation")
public void testObjectLabeling() throws IOException {
//Do a few tricky experiments to make sure things are being written
//the way we expect
//Write the data array with ObjectWritable
//which will indirectly write it using APW.Internal
ObjectWritable.writeObject(out, i, i.getClass(), null, true);
//Write the corresponding APW directly with ObjectWritable
ArrayPrimitiveWritable apw = new ArrayPrimitiveWritable(i);
ObjectWritable.writeObject(out, apw, apw.getClass(), null, true);
//Get ready to read it back
in.reset(out.getData(), out.getLength());
//Read the int[] object as written by ObjectWritable, but
//"going around" ObjectWritable
String className = UTF8.readString(in);
assertEquals("The int[] written by ObjectWritable was not labelled as "
+ "an ArrayPrimitiveWritable.Internal",
ArrayPrimitiveWritable.Internal.class.getName(), className);
ArrayPrimitiveWritable.Internal apwi =
new ArrayPrimitiveWritable.Internal();
apwi.readFields(in);
assertEquals("The ArrayPrimitiveWritable.Internal component type was corrupted",
int.class, apw.getComponentType());
assertTrue("The int[] written by ObjectWritable as "
+ "ArrayPrimitiveWritable.Internal was corrupted",
Arrays.equals(i, (int[])(apwi.get())));
//Read the APW object as written by ObjectWritable, but
//"going around" ObjectWritable
String declaredClassName = UTF8.readString(in);
assertEquals("The APW written by ObjectWritable was not labelled as "
+ "declaredClass ArrayPrimitiveWritable",
ArrayPrimitiveWritable.class.getName(), declaredClassName);
className = UTF8.readString(in);
assertEquals("The APW written by ObjectWritable was not labelled as "
+ "class ArrayPrimitiveWritable",
ArrayPrimitiveWritable.class.getName(), className);
ArrayPrimitiveWritable apw2 =
new ArrayPrimitiveWritable();
apw2.readFields(in);
assertEquals("The ArrayPrimitiveWritable component type was corrupted",
int.class, apw2.getComponentType());
assertTrue("The int[] written by ObjectWritable as "
+ "ArrayPrimitiveWritable was corrupted",
Arrays.equals(i, (int[])(apw2.get())));
}
@Test
public void testOldFormat() throws IOException {
//Make sure we still correctly write the old format if desired.
//Write the data array with old ObjectWritable API
//which will set allowCompactArrays false.
ObjectWritable.writeObject(out, i, i.getClass(), null);
//Get ready to read it back
in.reset(out.getData(), out.getLength());
//Read the int[] object as written by ObjectWritable, but
//"going around" ObjectWritable
@SuppressWarnings("deprecation")
String className = UTF8.readString(in);
assertEquals("The int[] written by ObjectWritable as a non-compact array "
+ "was not labelled as an array of int",
i.getClass().getName(), className);
int length = in.readInt();
assertEquals("The int[] written by ObjectWritable as a non-compact array "
+ "was not expected length", i.length, length);
int[] readValue = new int[length];
try {
for (int i = 0; i < length; i++) {
readValue[i] = (int)((Integer)ObjectWritable.readObject(in, null));
}
} catch (Exception e) {
fail("The int[] written by ObjectWritable as a non-compact array "
+ "was corrupted. Failed to correctly read int[] of length "
+ length + ". Got exception:\n"
+ StringUtils.stringifyException(e));
}
assertTrue("The int[] written by ObjectWritable as a non-compact array "
+ "was corrupted.", Arrays.equals(i, readValue));
}
}