HBASE-4946 HTable.coprocessorExec (and possibly coprocessorProxy) does not work with

dynamically loaded coprocessors (Andrei Dragomir)


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1212250 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-12-09 05:58:05 +00:00
parent e167c8ab28
commit abf897ad42
4 changed files with 57 additions and 7 deletions

View File

@ -453,6 +453,8 @@ Release 0.92.0 - Unreleased
HBASE-4945 NPE in HRegion.bulkLoadHFiles (Andrew P and Lars H) HBASE-4945 NPE in HRegion.bulkLoadHFiles (Andrew P and Lars H)
HBASE-4942 HMaster is unable to start of HFile V1 is used (Honghua Zhu) HBASE-4942 HMaster is unable to start of HFile V1 is used (Honghua Zhu)
HBASE-4610 Port HBASE-3380 (Master failover can split logs of live servers) to 92/trunk HBASE-4610 Port HBASE-3380 (Master failover can split logs of live servers) to 92/trunk
HBASE-4946 HTable.coprocessorExec (and possibly coprocessorProxy) does not work with
dynamically loaded coprocessors (Andrei Dragomir)
TESTS TESTS
HBASE-4450 test for number of blocks read: to serve as baseline for expected HBASE-4450 test for number of blocks read: to serve as baseline for expected

View File

@ -56,6 +56,7 @@ public class Exec extends Invocation implements Row {
/** Row key used as a reference for any region lookups */ /** Row key used as a reference for any region lookups */
private byte[] referenceRow; private byte[] referenceRow;
private Class<? extends CoprocessorProtocol> protocol; private Class<? extends CoprocessorProtocol> protocol;
private String protocolName;
public Exec() { public Exec() {
} }
@ -68,6 +69,11 @@ public class Exec extends Invocation implements Row {
this.conf = configuration; this.conf = configuration;
this.referenceRow = row; this.referenceRow = row;
this.protocol = protocol; this.protocol = protocol;
this.protocolName = protocol.getName();
}
public String getProtocolName() {
return protocolName;
} }
public Class<? extends CoprocessorProtocol> getProtocol() { public Class<? extends CoprocessorProtocol> getProtocol() {
@ -117,12 +123,6 @@ public class Exec extends Invocation implements Row {
} }
// fields for Exec // fields for Exec
referenceRow = Bytes.readByteArray(in); referenceRow = Bytes.readByteArray(in);
String protocolName = in.readUTF(); protocolName = in.readUTF();
try {
protocol = (Class<CoprocessorProtocol>)conf.getClassByName(protocolName);
}
catch (ClassNotFoundException cnfe) {
throw new IOException("Protocol class "+protocolName+" not found", cnfe);
}
} }
} }

View File

@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -182,6 +183,9 @@ public class HRegion implements HeapSize { // , Writable{
private ClassToInstanceMap<CoprocessorProtocol> private ClassToInstanceMap<CoprocessorProtocol>
protocolHandlers = MutableClassToInstanceMap.create(); protocolHandlers = MutableClassToInstanceMap.create();
private Map<String, Class<? extends CoprocessorProtocol>>
protocolHandlerNames = Maps.newHashMap();
/** /**
* Temporary subdirectory of the region directory used for compaction output. * Temporary subdirectory of the region directory used for compaction output.
*/ */
@ -4397,6 +4401,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
protocolHandlers.putInstance(protocol, handler); protocolHandlers.putInstance(protocol, handler);
protocolHandlerNames.put(protocol.getName(), protocol);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Registered protocol handler: region="+ LOG.debug("Registered protocol handler: region="+
Bytes.toStringBinary(getRegionName())+" protocol="+protocol.getName()); Bytes.toStringBinary(getRegionName())+" protocol="+protocol.getName());
@ -4422,6 +4427,19 @@ public class HRegion implements HeapSize { // , Writable{
public ExecResult exec(Exec call) public ExecResult exec(Exec call)
throws IOException { throws IOException {
Class<? extends CoprocessorProtocol> protocol = call.getProtocol(); Class<? extends CoprocessorProtocol> protocol = call.getProtocol();
if (protocol == null) {
String protocolName = call.getProtocolName();
if (LOG.isDebugEnabled()) {
LOG.debug("Received dynamic protocol exec call with protocolName " + protocolName);
}
// detect the actual protocol class
protocol = protocolHandlerNames.get(protocolName);
if (protocol == null) {
throw new HBaseRPC.UnknownProtocolException(protocol,
"No matching handler for protocol "+protocolName+
" in region "+Bytes.toStringBinary(getRegionName()));
}
}
if (!protocolHandlers.containsKey(protocol)) { if (!protocolHandlers.containsKey(protocol)) {
throw new HBaseRPC.UnknownProtocolException(protocol, throw new HBaseRPC.UnknownProtocolException(protocol,
"No matching handler for protocol "+protocol.getName()+ "No matching handler for protocol "+protocol.getName()+

View File

@ -31,8 +31,13 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -49,6 +54,9 @@ public class TestCoprocessorEndpoint {
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
private static byte[] ROW = Bytes.toBytes("testRow"); private static byte[] ROW = Bytes.toBytes("testRow");
private static final String protocolName = "org.apache.hadoop.hbase.CustomProtocol";
private static final String methodName = "myFunc";
private static final int ROWSIZE = 20; private static final int ROWSIZE = 20;
private static final int rowSeperator1 = 5; private static final int rowSeperator1 = 5;
private static final int rowSeperator2 = 12; private static final int rowSeperator2 = 12;
@ -171,6 +179,28 @@ public class TestCoprocessorEndpoint {
assertEquals("Invalid result", sumResult, expectedResult); assertEquals("Invalid result", sumResult, expectedResult);
} }
@Test
public void testExecDeserialization() throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
dob.writeUTF(methodName);
dob.writeInt(1);
Scan scan = new Scan();
HbaseObjectWritable.writeObject(dob, scan, Scan.class, new Configuration());
dob.writeUTF("org.apache.hadoop.hbase.client.Scan");
Bytes.writeByteArray(dob, new byte[]{'a'});
// this is the dynamic protocol name
dob.writeUTF(protocolName);
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), dob.getLength());
Exec after = new Exec();
after.readFields(dib);
// no error thrown
assertEquals(after.getProtocolName(), protocolName);
assertEquals(after.getMethodName(), methodName);
}
private static byte[][] makeN(byte[] base, int n) { private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][]; byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {