HBASE-3400 Coprocessor Support for Generic Interfaces

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1067252 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Helmling 2011-02-04 18:25:40 +00:00
parent d1176f5791
commit e8f33fc4cc
8 changed files with 197 additions and 46 deletions

View File

@ -40,6 +40,8 @@ Release 0.91.0 - Unreleased
the query matcher and can lead to incorrect behavior
HBASE-3492 NPE while splitting table with empty column family store
HBASE-3495 Shell is failing on subsequent split calls
HBASE-3400 Coprocessor Support for Generic Interfaces
(Ed Kohlwey via Gary Helmling)
IMPROVEMENTS

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.Invocation;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Classes;
import java.io.DataInput;
import java.io.DataOutput;
@ -83,14 +84,37 @@ public class Exec extends Invocation implements Row {
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
// fields for Invocation
out.writeUTF(this.methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
HbaseObjectWritable.writeObject(out, parameters[i], parameters[i].getClass(),
conf);
out.writeUTF(parameterClasses[i].getName());
}
// fields for Exec
Bytes.writeByteArray(out, referenceRow);
out.writeUTF(protocol.getName());
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
// fields for Invocation
methodName = in.readUTF();
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
HbaseObjectWritable objectWritable = new HbaseObjectWritable();
for (int i = 0; i < parameters.length; i++) {
parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
this.conf);
String parameterClassName = in.readUTF();
try {
parameterClasses[i] = Classes.extendedForName(parameterClassName);
} catch (ClassNotFoundException e) {
throw new IOException("Couldn't find class: " + parameterClassName);
}
}
// fields for Exec
referenceRow = Bytes.readByteArray(in);
String protocolName = in.readUTF();
try {

View File

@ -21,11 +21,13 @@ package org.apache.hadoop.hbase.client.coprocessor;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Classes;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
/**
* Represents the return value from a
@ -70,12 +72,25 @@ public class ExecResult implements Writable {
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, regionName);
HbaseObjectWritable.writeObject(out, value,
(valueType != null ? valueType : Writable.class), null);
value.getClass(), null);
Class<?> alternativeSerializationClass;
if(value instanceof Writable){
alternativeSerializationClass = Writable.class;
} else {
alternativeSerializationClass = Serializable.class;
}
out.writeUTF((valueType != null ? valueType : alternativeSerializationClass).getName());
}
@Override
public void readFields(DataInput in) throws IOException {
regionName = Bytes.readByteArray(in);
value = HbaseObjectWritable.readObject(in, null);
String className = in.readUTF();
try {
valueType = Classes.extendedForName(className);
} catch (ClassNotFoundException e) {
throw new IOException("Unable to find class of type: " + className );
}
}
}

View File

@ -31,11 +31,11 @@ import java.lang.reflect.Method;
/** A method invocation, including the method name and its parameters.*/
public class Invocation implements Writable, Configurable {
private String methodName;
protected String methodName;
@SuppressWarnings("unchecked")
private Class[] parameterClasses;
private Object[] parameters;
private Configuration conf;
protected Class[] parameterClasses;
protected Object[] parameters;
protected Configuration conf;
public Invocation() {}

View File

@ -0,0 +1,44 @@
package org.apache.hadoop.hbase.util;
/**
* Utilities for class manipulation.
*/
public class Classes {
/**
* Equivalent of {@link Class#forName(String)} which also returns classes for
* primitives like <code>boolean</code>, etc.
*
* @param className
* The name of the class to retrieve. Can be either a normal class or
* a primitive class.
* @return The class specified by <code>className</code>
* @throws ClassNotFoundException
* If the requested class can not be found.
*/
public static Class<?> extendedForName(String className)
throws ClassNotFoundException {
Class<?> valueType;
if (className.equals("boolean")) {
valueType = boolean.class;
} else if (className.equals("byte")) {
valueType = byte.class;
} else if (className.equals("short")) {
valueType = short.class;
} else if (className.equals("int")) {
valueType = int.class;
} else if (className.equals("long")) {
valueType = long.class;
} else if (className.equals("float")) {
valueType = float.class;
} else if (className.equals("double")) {
valueType = double.class;
} else if (className.equals("char")) {
valueType = char.class;
} else {
valueType = Class.forName(className);
}
return valueType;
}
}

View File

@ -0,0 +1,11 @@
package org.apache.hadoop.hbase.coprocessor;
public class GenericEndpoint extends BaseEndpointCoprocessor implements
GenericProtocol {
@Override
public <T> T doWork(T genericObject) {
return genericObject;
}
}

View File

@ -0,0 +1,17 @@
package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
public interface GenericProtocol extends CoprocessorProtocol {
/**
* Simple interface to allow the passing of a generic parameter to see if the
* RPC framework can accommodate generics.
*
* @param <T>
* @param genericObject
* @return
*/
public <T> T doWork(T genericObject);
}

View File

@ -19,17 +19,25 @@
*/
package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.*;
import org.apache.hadoop.conf.Configuration;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import java.util.Map;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* TestEndpoint: test cases to verify coprocessor Endpoint
@ -39,12 +47,12 @@ public class TestCoprocessorEndpoint {
private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte[] ROW = Bytes.toBytes("testRow");
private static final int ROWSIZE = 20;
private static final int rowSeperator1 = 5;
private static final int rowSeperator2 = 12;
private static byte [][] ROWS = makeN(ROW, ROWSIZE);
private static byte[][] ROWS = makeN(ROW, ROWSIZE);
private static HBaseTestingUtility util = new HBaseTestingUtility();
private static MiniHBaseCluster cluster = null;
@ -53,18 +61,19 @@ public class TestCoprocessorEndpoint {
public static void setupBeforeClass() throws Exception {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint");
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint",
"org.apache.hadoop.hbase.coprocessor.GenericEndpoint");
util.startMiniCluster(2);
cluster = util.getMiniHBaseCluster();
HTable table = util.createTable(TEST_TABLE, TEST_FAMILY);
util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY,
new byte[][]{ HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1],
ROWS[rowSeperator2]});
new byte[][] { HConstants.EMPTY_BYTE_ARRAY,
ROWS[rowSeperator1], ROWS[rowSeperator2] });
for(int i = 0; i < ROWSIZE; i++) {
for (int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
table.put(put);
@ -79,6 +88,35 @@ public class TestCoprocessorEndpoint {
util.shutdownMiniCluster();
}
@Test
public void testGeneric() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
GenericProtocol protocol = table.coprocessorProxy(GenericProtocol.class,
Bytes.toBytes("testRow"));
String workResult1 = protocol.doWork("foo");
assertEquals("foo", workResult1);
byte[] workResult2 = protocol.doWork(new byte[]{1});
assertArrayEquals(new byte[]{1}, workResult2);
byte workResult3 = protocol.doWork((byte)1);
assertEquals((byte)1, workResult3);
char workResult4 = protocol.doWork('c');
assertEquals('c', workResult4);
boolean workResult5 = protocol.doWork(true);
assertEquals(true, workResult5);
short workResult6 = protocol.doWork((short)1);
assertEquals((short)1, workResult6);
int workResult7 = protocol.doWork(5);
assertEquals(5, workResult7);
long workResult8 = protocol.doWork(5l);
assertEquals(5l, workResult8);
double workResult9 = protocol.doWork(6d);
assertEquals(6d, workResult9, 0.01);
float workResult10 = protocol.doWork(6f);
assertEquals(6f, workResult10, 0.01);
Text workResult11 = protocol.doWork(new Text("foo"));
assertEquals(new Text("foo"), workResult11);
}
@Test
public void testAggregation() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
@ -86,21 +124,21 @@ public class TestCoprocessorEndpoint {
Map<byte[], Long> results;
// scan: for all regions
results = table.coprocessorExec(ColumnAggregationProtocol.class,
ROWS[rowSeperator1 - 1],
ROWS[rowSeperator2 + 1],
new Batch.Call<ColumnAggregationProtocol,Long>() {
public Long call(ColumnAggregationProtocol instance)
throws IOException{
return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
}
});
results = table
.coprocessorExec(ColumnAggregationProtocol.class,
ROWS[rowSeperator1 - 1], ROWS[rowSeperator2 + 1],
new Batch.Call<ColumnAggregationProtocol, Long>() {
public Long call(ColumnAggregationProtocol instance)
throws IOException {
return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
}
});
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
sumResult += e.getValue();
}
for(int i = 0;i < ROWSIZE; i++) {
for (int i = 0; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", sumResult, expectedResult);
@ -108,29 +146,29 @@ public class TestCoprocessorEndpoint {
results.clear();
// scan: for region 2 and region 3
results = table.coprocessorExec(ColumnAggregationProtocol.class,
ROWS[rowSeperator1 + 1],
ROWS[rowSeperator2 + 1],
new Batch.Call<ColumnAggregationProtocol,Long>() {
public Long call(ColumnAggregationProtocol instance)
throws IOException{
return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
}
});
results = table
.coprocessorExec(ColumnAggregationProtocol.class,
ROWS[rowSeperator1 + 1], ROWS[rowSeperator2 + 1],
new Batch.Call<ColumnAggregationProtocol, Long>() {
public Long call(ColumnAggregationProtocol instance)
throws IOException {
return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
}
});
sumResult = 0;
expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
sumResult += e.getValue();
}
for(int i = rowSeperator1;i < ROWSIZE; i++) {
for (int i = rowSeperator1; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", sumResult, expectedResult);
}
private static byte [][] makeN(byte [] base, int n) {
byte [][] ret = new byte[n][];
for(int i=0;i<n;i++) {
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {
ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
}
return ret;