HBASE-6895 Remove CoprocessorProtocol support and implementations
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1425010 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1793a0bbcd
commit
b70f27ac6a
File diff suppressed because it is too large
Load Diff
|
@ -256,45 +256,6 @@ message BulkLoadHFileResponse {
|
|||
required bool loaded = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* An individual coprocessor call. You must specify the protocol,
|
||||
* the method, and the row to which the call will be executed.
|
||||
*
|
||||
* You can specify the configuration settings in the property list.
|
||||
*
|
||||
* The parameter list has the parameters used for the method.
|
||||
* A parameter is a pair of parameter name and the binary parameter
|
||||
* value. The name is the parameter class name. The value is the
|
||||
* binary format of the parameter, for example, protocol buffer
|
||||
* encoded value.
|
||||
*/
|
||||
message Exec {
|
||||
required bytes row = 1;
|
||||
required string protocolName = 2;
|
||||
required string methodName = 3;
|
||||
repeated NameStringPair property = 4;
|
||||
repeated NameBytesPair parameter = 5;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
|
||||
* method using the registered protocol handlers.
|
||||
* {@link CoprocessorProtocol} implementations must be registered via the
|
||||
* {@link org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(
|
||||
* Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)}
|
||||
* method before they are available.
|
||||
* @deprecated Use CoprocessorService going forward
|
||||
*/
|
||||
message ExecCoprocessorRequest {
|
||||
required RegionSpecifier region = 1;
|
||||
required Exec call = 2;
|
||||
}
|
||||
|
||||
// @deprecated Use CoprocessorService going forward
|
||||
message ExecCoprocessorResponse {
|
||||
required NameBytesPair value = 1;
|
||||
}
|
||||
|
||||
message CoprocessorServiceCall {
|
||||
required bytes row = 1;
|
||||
required string serviceName = 2;
|
||||
|
@ -319,7 +280,6 @@ message CoprocessorServiceResponse {
|
|||
message MultiAction {
|
||||
optional Mutate mutate = 1;
|
||||
optional Get get = 2;
|
||||
optional Exec exec = 3;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -372,9 +332,6 @@ service ClientService {
|
|||
rpc bulkLoadHFile(BulkLoadHFileRequest)
|
||||
returns(BulkLoadHFileResponse);
|
||||
|
||||
rpc execCoprocessor(ExecCoprocessorRequest)
|
||||
returns(ExecCoprocessorResponse);
|
||||
|
||||
rpc execService(CoprocessorServiceRequest)
|
||||
returns(CoprocessorServiceResponse);
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
|
|||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
||||
/**
|
||||
|
@ -299,37 +298,6 @@ public interface HConnection extends Abortable, Closeable {
|
|||
Object[] results,
|
||||
Batch.Callback<R> callback) throws IOException, InterruptedException;
|
||||
|
||||
|
||||
/**
|
||||
* Executes the given
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
|
||||
* callable for each row in the given list and invokes
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
|
||||
* for each result returned.
|
||||
*
|
||||
* @param protocol the protocol interface being called
|
||||
* @param rows a list of row keys for which the callable should be invoked
|
||||
* @param tableName table name for the coprocessor invoked
|
||||
* @param pool ExecutorService used to submit the calls per row
|
||||
* @param call instance on which to invoke
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
|
||||
* for each row
|
||||
* @param callback instance on which to invoke
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
|
||||
* for each result
|
||||
* @param <T> the protocol interface type
|
||||
* @param <R> the callable's return type
|
||||
* @throws IOException
|
||||
* @deprecated CoprocessorProtocol replaced by CoprocessorService calls.
|
||||
*/
|
||||
public <T extends CoprocessorProtocol,R> void processExecs(
|
||||
final Class<T> protocol,
|
||||
List<byte[]> rows,
|
||||
final byte[] tableName,
|
||||
ExecutorService pool,
|
||||
final Batch.Call<T,R> call,
|
||||
final Batch.Callback<R> callback) throws IOException, Throwable;
|
||||
|
||||
/**
|
||||
* Enable or disable region cache prefetch for the table. It will be
|
||||
* applied for the given table's all HTable instances within this
|
||||
|
|
|
@ -37,7 +37,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
@ -73,8 +72,6 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
|||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPC;
|
||||
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
|
@ -2124,75 +2121,6 @@ public class HConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Executes the given
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
|
||||
* callable for each row in the
|
||||
* given list and invokes
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
|
||||
* for each result returned.
|
||||
*
|
||||
* @param protocol the protocol interface being called
|
||||
* @param rows a list of row keys for which the callable should be invoked
|
||||
* @param tableName table name for the coprocessor invoked
|
||||
* @param pool ExecutorService used to submit the calls per row
|
||||
* @param callable instance on which to invoke
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
|
||||
* for each row
|
||||
* @param callback instance on which to invoke
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
|
||||
* for each result
|
||||
* @param <T> the protocol interface type
|
||||
* @param <R> the callable's return type
|
||||
* @throws IOException
|
||||
*/
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol,R> void processExecs(
|
||||
final Class<T> protocol,
|
||||
List<byte[]> rows,
|
||||
final byte[] tableName,
|
||||
ExecutorService pool,
|
||||
final Batch.Call<T,R> callable,
|
||||
final Batch.Callback<R> callback)
|
||||
throws IOException, Throwable {
|
||||
|
||||
Map<byte[],Future<R>> futures =
|
||||
new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
|
||||
for (final byte[] r : rows) {
|
||||
final ExecRPCInvoker invoker =
|
||||
new ExecRPCInvoker(conf, this, protocol, tableName, r);
|
||||
Future<R> future = pool.submit(
|
||||
new Callable<R>() {
|
||||
public R call() throws Exception {
|
||||
T instance = (T)Proxy.newProxyInstance(conf.getClassLoader(),
|
||||
new Class[]{protocol},
|
||||
invoker);
|
||||
R result = callable.call(instance);
|
||||
byte[] region = invoker.getRegionName();
|
||||
if (callback != null) {
|
||||
callback.update(region, r, result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
});
|
||||
futures.put(r, future);
|
||||
}
|
||||
for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
|
||||
try {
|
||||
e.getValue().get();
|
||||
} catch (ExecutionException ee) {
|
||||
LOG.warn("Error executing for row "+Bytes.toStringBinary(e.getKey()), ee);
|
||||
throw ee.getCause();
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("Interrupted executing for row " +
|
||||
Bytes.toStringBinary(e.getKey()), ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Return the number of cached region for a table. It will only be called
|
||||
* from a unit test.
|
||||
|
@ -2210,8 +2138,6 @@ public class HConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Check the region cache to see whether a region is cached yet or not.
|
||||
* Called by unit tests.
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.client;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -52,9 +51,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
|
||||
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
|
@ -1191,22 +1188,6 @@ public class HTable implements HTableInterface {
|
|||
this.connection.clearRegionCache();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol> T coprocessorProxy(
|
||||
Class<T> protocol, byte[] row) {
|
||||
return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(),
|
||||
new Class[]{protocol},
|
||||
new ExecRPCInvoker(configuration,
|
||||
connection,
|
||||
protocol,
|
||||
tableName,
|
||||
row));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
@ -1214,43 +1195,6 @@ public class HTable implements HTableInterface {
|
|||
return new RegionCoprocessorRpcChannel(connection, tableName, row);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
|
||||
Class<T> protocol, byte[] startKey, byte[] endKey,
|
||||
Batch.Call<T,R> callable)
|
||||
throws IOException, Throwable {
|
||||
|
||||
final Map<byte[],R> results = Collections.synchronizedMap(new TreeMap<byte[],R>(
|
||||
Bytes.BYTES_COMPARATOR));
|
||||
coprocessorExec(protocol, startKey, endKey, callable,
|
||||
new Batch.Callback<R>(){
|
||||
public void update(byte[] region, byte[] row, R value) {
|
||||
results.put(region, value);
|
||||
}
|
||||
});
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol, R> void coprocessorExec(
|
||||
Class<T> protocol, byte[] startKey, byte[] endKey,
|
||||
Batch.Call<T,R> callable, Batch.Callback<R> callback)
|
||||
throws IOException, Throwable {
|
||||
|
||||
// get regions covered by the row range
|
||||
List<byte[]> keys = getStartKeysInRange(startKey, endKey);
|
||||
connection.processExecs(protocol, keys, tableName, pool, callable,
|
||||
callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
|
||||
/**
|
||||
|
@ -415,95 +414,6 @@ public interface HTableInterface extends Closeable {
|
|||
*/
|
||||
void unlockRow(RowLock rl) throws IOException;
|
||||
|
||||
/**
|
||||
* Creates and returns a proxy to the CoprocessorProtocol instance running in the
|
||||
* region containing the specified row. The row given does not actually have
|
||||
* to exist. Whichever region would contain the row based on start and end keys will
|
||||
* be used. Note that the {@code row} parameter is also not passed to the
|
||||
* coprocessor handler registered for this protocol, unless the {@code row}
|
||||
* is separately passed as an argument in a proxy method call. The parameter
|
||||
* here is just used to locate the region used to handle the call.
|
||||
*
|
||||
* @param protocol The class or interface defining the remote protocol
|
||||
* @param row The row key used to identify the remote region location
|
||||
* @return A CoprocessorProtocol instance
|
||||
* @deprecated since 0.96. Use {@link HTableInterface#coprocessorService(byte[])} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
<T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, byte[] row);
|
||||
|
||||
/**
|
||||
* Invoke the passed
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} against
|
||||
* the {@link CoprocessorProtocol} instances running in the selected regions.
|
||||
* All regions beginning with the region containing the <code>startKey</code>
|
||||
* row, through to the region containing the <code>endKey</code> row (inclusive)
|
||||
* will be used. If <code>startKey</code> or <code>endKey</code> is
|
||||
* <code>null</code>, the first and last regions in the table, respectively,
|
||||
* will be used in the range selection.
|
||||
*
|
||||
* @param protocol the CoprocessorProtocol implementation to call
|
||||
* @param startKey start region selection with region containing this row
|
||||
* @param endKey select regions up to and including the region containing
|
||||
* this row
|
||||
* @param callable wraps the CoprocessorProtocol implementation method calls
|
||||
* made per-region
|
||||
* @param <T> CoprocessorProtocol subclass for the remote invocation
|
||||
* @param <R> Return type for the
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
|
||||
* method
|
||||
* @return a <code>Map</code> of region names to
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} return values
|
||||
*
|
||||
* @deprecated since 0.96. Use
|
||||
* {@link HTableInterface#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
<T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
|
||||
Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable)
|
||||
throws IOException, Throwable;
|
||||
|
||||
/**
|
||||
* Invoke the passed
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} against
|
||||
* the {@link CoprocessorProtocol} instances running in the selected regions.
|
||||
* All regions beginning with the region containing the <code>startKey</code>
|
||||
* row, through to the region containing the <code>endKey</code> row
|
||||
* (inclusive)
|
||||
* will be used. If <code>startKey</code> or <code>endKey</code> is
|
||||
* <code>null</code>, the first and last regions in the table, respectively,
|
||||
* will be used in the range selection.
|
||||
*
|
||||
* <p>
|
||||
* For each result, the given
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
|
||||
* method will be called.
|
||||
*</p>
|
||||
*
|
||||
* @param protocol the CoprocessorProtocol implementation to call
|
||||
* @param startKey start region selection with region containing this row
|
||||
* @param endKey select regions up to and including the region containing
|
||||
* this row
|
||||
* @param callable wraps the CoprocessorProtocol implementation method calls
|
||||
* made per-region
|
||||
* @param callback an instance upon which
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)} with the
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
|
||||
* return value for each region
|
||||
* @param <T> CoprocessorProtocol subclass for the remote invocation
|
||||
* @param <R> Return type for the
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
|
||||
* method
|
||||
*
|
||||
* @deprecated since 0.96.
|
||||
* Use {@link HTableInterface#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
<T extends CoprocessorProtocol, R> void coprocessorExec(
|
||||
Class<T> protocol, byte[] startKey, byte[] endKey,
|
||||
Batch.Call<T,R> callable, Batch.Callback<R> callback)
|
||||
throws IOException, Throwable;
|
||||
|
||||
/**
|
||||
* Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the
|
||||
* table region containing the specified row. The row given does not actually have
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.PoolMap;
|
||||
|
@ -474,30 +473,6 @@ public class HTablePool implements Closeable {
|
|||
table.unlockRow(rl);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol> T coprocessorProxy(
|
||||
Class<T> protocol, byte[] row) {
|
||||
return table.coprocessorProxy(protocol, row);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
|
||||
Class<T> protocol, byte[] startKey, byte[] endKey,
|
||||
Batch.Call<T, R> callable) throws IOException, Throwable {
|
||||
return table.coprocessorExec(protocol, startKey, endKey, callable);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol, R> void coprocessorExec(
|
||||
Class<T> protocol, byte[] startKey, byte[] endKey,
|
||||
Batch.Call<T, R> callable, Batch.Callback<R> callback)
|
||||
throws IOException, Throwable {
|
||||
table.coprocessorExec(protocol, startKey, endKey, callable, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CoprocessorRpcChannel coprocessorService(byte[] row) {
|
||||
return table.coprocessorService(row);
|
||||
|
|
|
@ -19,18 +19,10 @@
|
|||
|
||||
package org.apache.hadoop.hbase.client.coprocessor;
|
||||
|
||||
import org.apache.commons.lang.reflect.MethodUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -40,95 +32,6 @@ import java.lang.reflect.Proxy;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class Batch {
|
||||
private static Log LOG = LogFactory.getLog(Batch.class);
|
||||
|
||||
/**
|
||||
* Creates a new {@link Batch.Call} instance that invokes a method
|
||||
* with the given parameters and returns the result.
|
||||
*
|
||||
* <p>
|
||||
* Note that currently the method is naively looked up using the method name
|
||||
* and class types of the passed arguments, which means that
|
||||
* <em>none of the arguments can be <code>null</code></em>.
|
||||
* For more flexibility, see
|
||||
* {@link Batch#forMethod(java.lang.reflect.Method, Object...)}.
|
||||
* </p>
|
||||
*
|
||||
* @param protocol the protocol class being called
|
||||
* @param method the method name
|
||||
* @param args zero or more arguments to be passed to the method
|
||||
* (individual args cannot be <code>null</code>!)
|
||||
* @param <T> the class type of the protocol implementation being invoked
|
||||
* @param <R> the return type for the method call
|
||||
* @return a {@code Callable} instance that will invoke the given method
|
||||
* and return the results
|
||||
* @throws NoSuchMethodException if the method named, with the given argument
|
||||
* types, cannot be found in the protocol class
|
||||
* @see Batch#forMethod(java.lang.reflect.Method, Object...)
|
||||
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
|
||||
*/
|
||||
@Deprecated
|
||||
public static <T extends CoprocessorProtocol,R> Call<T,R> forMethod(
|
||||
final Class<T> protocol, final String method, final Object... args)
|
||||
throws NoSuchMethodException {
|
||||
Class[] types = new Class[args.length];
|
||||
for (int i=0; i<args.length; i++) {
|
||||
if (args[i] == null) {
|
||||
throw new NullPointerException("Method argument cannot be null");
|
||||
}
|
||||
types[i] = args[i].getClass();
|
||||
}
|
||||
|
||||
Method m = MethodUtils.getMatchingAccessibleMethod(protocol, method, types);
|
||||
if (m == null) {
|
||||
throw new NoSuchMethodException("No matching method found for '" +
|
||||
method + "'");
|
||||
}
|
||||
|
||||
m.setAccessible(true);
|
||||
return forMethod(m, args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link Batch.Call} instance that invokes a method
|
||||
* with the given parameters and returns the result.
|
||||
*
|
||||
* @param method the method reference to invoke
|
||||
* @param args zero or more arguments to be passed to the method
|
||||
* @param <T> the class type of the protocol implementation being invoked
|
||||
* @param <R> the return type for the method call
|
||||
* @return a {@code Callable} instance that will invoke the given method and
|
||||
* return the results
|
||||
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
|
||||
*/
|
||||
@Deprecated
|
||||
public static <T extends CoprocessorProtocol,R> Call<T,R> forMethod(
|
||||
final Method method, final Object... args) {
|
||||
return new Call<T,R>() {
|
||||
public R call(T instance) throws IOException {
|
||||
try {
|
||||
if (Proxy.isProxyClass(instance.getClass())) {
|
||||
InvocationHandler invoker = Proxy.getInvocationHandler(instance);
|
||||
return (R)invoker.invoke(instance, method, args);
|
||||
} else {
|
||||
LOG.warn("Non proxied invocation of method '"+method.getName()+"'!");
|
||||
return (R)method.invoke(instance, args);
|
||||
}
|
||||
}
|
||||
catch (IllegalAccessException iae) {
|
||||
throw new IOException("Unable to invoke method '"+
|
||||
method.getName()+"'", iae);
|
||||
}
|
||||
catch (InvocationTargetException ite) {
|
||||
throw new IOException(ite.toString(), ite);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
throw new IOException(t.toString(), t);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a unit of work to be executed.
|
||||
*
|
||||
|
|
|
@ -1,133 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.coprocessor;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.Invocation;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Classes;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
/**
|
||||
* Represents an arbitrary method invocation against a Coprocessor
|
||||
* instance. In order for a coprocessor implementation to be remotely callable
|
||||
* by clients, it must define and implement a {@link CoprocessorProtocol}
|
||||
* subclass. Only methods defined in the {@code CoprocessorProtocol} interface
|
||||
* will be callable by clients.
|
||||
*
|
||||
* <p>
|
||||
* This class is used internally by
|
||||
* {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
|
||||
* to wrap the {@code CoprocessorProtocol} method invocations requested in
|
||||
* RPC calls. It should not be used directly by HBase clients.
|
||||
* </p>
|
||||
*
|
||||
* @see ExecResult
|
||||
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
|
||||
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
|
||||
* @deprecated since 0.96.0. See {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])}
|
||||
* or related methods instead.
|
||||
*/
|
||||
@Deprecated
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class Exec extends Invocation implements Row {
|
||||
/** Row key used as a reference for any region lookups */
|
||||
private byte[] referenceRow;
|
||||
private Class<? extends CoprocessorProtocol> protocol;
|
||||
private String protocolName;
|
||||
|
||||
public Exec() {
|
||||
}
|
||||
|
||||
public Exec(Configuration configuration,
|
||||
byte[] row,
|
||||
Class<? extends CoprocessorProtocol> protocol,
|
||||
Method method, Object[] parameters) {
|
||||
super(method, parameters);
|
||||
this.conf = configuration;
|
||||
this.referenceRow = row;
|
||||
this.protocol = protocol;
|
||||
this.protocolName = protocol.getName();
|
||||
}
|
||||
|
||||
public String getProtocolName() {
|
||||
return protocolName;
|
||||
}
|
||||
|
||||
public Class<? extends CoprocessorProtocol> getProtocol() {
|
||||
return protocol;
|
||||
}
|
||||
|
||||
public byte[] getRow() {
|
||||
return referenceRow;
|
||||
}
|
||||
|
||||
public int compareTo(Row row) {
|
||||
return Bytes.compareTo(referenceRow, row.getRow());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
// fields for Invocation
|
||||
out.writeUTF(this.methodName);
|
||||
out.writeInt(parameterClasses.length);
|
||||
for (int i = 0; i < parameterClasses.length; i++) {
|
||||
HbaseObjectWritable.writeObject(out, parameters[i],
|
||||
parameters[i] != null ? parameters[i].getClass() : parameterClasses[i],
|
||||
conf);
|
||||
out.writeUTF(parameterClasses[i].getName());
|
||||
}
|
||||
// fields for Exec
|
||||
Bytes.writeByteArray(out, referenceRow);
|
||||
out.writeUTF(protocol.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
// fields for Invocation
|
||||
methodName = in.readUTF();
|
||||
parameters = new Object[in.readInt()];
|
||||
parameterClasses = new Class[parameters.length];
|
||||
HbaseObjectWritable objectWritable = new HbaseObjectWritable();
|
||||
for (int i = 0; i < parameters.length; i++) {
|
||||
parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
|
||||
this.conf);
|
||||
String parameterClassName = in.readUTF();
|
||||
try {
|
||||
parameterClasses[i] = Classes.extendedForName(parameterClassName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException("Couldn't find class: " + parameterClassName);
|
||||
}
|
||||
}
|
||||
// fields for Exec
|
||||
referenceRow = Bytes.readByteArray(in);
|
||||
protocolName = in.readUTF();
|
||||
}
|
||||
}
|
|
@ -1,87 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.coprocessor;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Classes;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Represents the return value from a
|
||||
* {@link org.apache.hadoop.hbase.client.coprocessor.Exec} invocation.
|
||||
* This simply wraps the value for easier
|
||||
* {@link org.apache.hadoop.hbase.io.HbaseObjectWritable}
|
||||
* serialization.
|
||||
*
|
||||
* <p>
|
||||
* This class is used internally by the HBase client code to properly serialize
|
||||
* responses from {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
|
||||
* method invocations. It should not be used directly by clients.
|
||||
* </p>
|
||||
*
|
||||
* @see Exec
|
||||
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
|
||||
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
|
||||
* @deprecated since 0.96.0. See {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])}
|
||||
* or related methods instead.
|
||||
*/
|
||||
@Deprecated
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class ExecResult implements Writable {
|
||||
private byte[] regionName;
|
||||
private Object value;
|
||||
|
||||
public ExecResult() {
|
||||
}
|
||||
|
||||
public ExecResult(byte[] region, Object value) {
|
||||
this.regionName = region;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public byte[] getRegionName() {
|
||||
return regionName;
|
||||
}
|
||||
|
||||
public Object getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Bytes.writeByteArray(out, regionName);
|
||||
HbaseObjectWritable.writeObject(out, value,
|
||||
value != null ? value.getClass() : Writable.class, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
regionName = Bytes.readByteArray(in);
|
||||
value = HbaseObjectWritable.readObject(in, null);
|
||||
}
|
||||
}
|
|
@ -1,81 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
|
||||
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
|
||||
|
||||
/**
|
||||
* This abstract class provides default implementation of an Endpoint.
|
||||
* It also maintains a CoprocessorEnvironment object which can be
|
||||
* used to access region resource.
|
||||
*
|
||||
* It's recommended to use this abstract class to implement your Endpoint.
|
||||
* However you still can just implement the interface CoprocessorProtocol
|
||||
* and Coprocessor to develop an Endpoint. But you won't be able to access
|
||||
* the region related resource, i.e., CoprocessorEnvironment.
|
||||
* @deprecated CoprocessorProtocol is going away in 0.96
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class BaseEndpointCoprocessor implements Coprocessor,
|
||||
CoprocessorProtocol, VersionedProtocol {
|
||||
/**
|
||||
* This Interfaces' version. Version changes when the Interface changes.
|
||||
*/
|
||||
// All HBase Interfaces used derive from HBaseRPCProtocolVersion. It
|
||||
// maintained a single global version number on all HBase Interfaces. This
|
||||
// meant all HBase RPC was broke though only one of the three RPC Interfaces
|
||||
// had changed. This has since been undone.
|
||||
public static final long VERSION = 28L;
|
||||
|
||||
private CoprocessorEnvironment env;
|
||||
|
||||
/**
|
||||
* @return env Coprocessor environment.
|
||||
*/
|
||||
public CoprocessorEnvironment getEnvironment() {
|
||||
return env;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment env) {
|
||||
this.env = env;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(CoprocessorEnvironment env) { }
|
||||
|
||||
@Override
|
||||
public ProtocolSignature getProtocolSignature(
|
||||
String protocol, long version, int clientMethodsHashCode)
|
||||
throws IOException {
|
||||
return new ProtocolSignature(VERSION, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProtocolVersion(String protocol, long clientVersion)
|
||||
throws IOException {
|
||||
return VERSION;
|
||||
}
|
||||
}
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
|
||||
|
@ -586,29 +585,6 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
return table.get(gets);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol, R> void coprocessorExec(Class<T> protocol,
|
||||
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable,
|
||||
Batch.Callback<R> callback) throws IOException, Throwable {
|
||||
table.coprocessorExec(protocol, startKey, endKey, callable, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
|
||||
Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
|
||||
throws IOException, Throwable {
|
||||
return table.coprocessorExec(protocol, startKey, endKey, callable);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
|
||||
byte[] row) {
|
||||
return table.coprocessorProxy(protocol, row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CoprocessorRpcChannel coprocessorService(byte[] row) {
|
||||
return table.coprocessorService(row);
|
||||
|
|
|
@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.BitComparator;
|
||||
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
|
||||
|
@ -238,7 +237,8 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
|
|||
addToMap(MultiResponse.class, code++);
|
||||
|
||||
// coprocessor execution
|
||||
addToMap(Exec.class, code++);
|
||||
// Exec no longer exists --> addToMap(Exec.class, code++);
|
||||
code++;
|
||||
addToMap(Increment.class, code++);
|
||||
|
||||
addToMap(KeyOnlyFilter.class, code++);
|
||||
|
|
|
@ -1,46 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* All custom RPC protocols to be exported by Coprocessors must extend this interface.
|
||||
*
|
||||
* <p>
|
||||
* <strong>Note that all callable methods must have a return type handled by
|
||||
* {@link org.apache.hadoop.hbase.io.HbaseObjectWritable#writeObject(java.io.DataOutput, Object, Class, org.apache.hadoop.conf.Configuration)}.</strong>
|
||||
* That is:
|
||||
* <ul>
|
||||
* <li>a Java primitive type ({@code int}, {@code float}, etc)</li>
|
||||
* <li>a Java {@code String}</li>
|
||||
* <li>a {@link org.apache.hadoop.io.Writable}</li>
|
||||
* <li>an array or {@code java.util.List} of one of the above</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
* @deprecated since 0.96. Use {@link org.apache.hadoop.hbase.coprocessor.CoprocessorService}
|
||||
* instead.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
@Deprecated
|
||||
public interface CoprocessorProtocol extends VersionedProtocol {
|
||||
public static final long VERSION = 1L;
|
||||
}
|
|
@ -1,98 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.ServerCallable;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* Backs a {@link CoprocessorProtocol} subclass proxy and forwards method
|
||||
* invocations for server execution. Note that internally this will issue a
|
||||
* separate RPC call for each method invocation (using a
|
||||
* {@link org.apache.hadoop.hbase.client.ServerCallable} instance).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@Deprecated
|
||||
public class ExecRPCInvoker implements InvocationHandler {
|
||||
// LOG is NOT in hbase subpackage intentionally so that the default HBase
|
||||
// DEBUG log level does NOT emit RPC-level logging.
|
||||
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.ExecRPCInvoker");
|
||||
|
||||
private Configuration conf;
|
||||
private final HConnection connection;
|
||||
private Class<? extends CoprocessorProtocol> protocol;
|
||||
private final byte[] table;
|
||||
private final byte[] row;
|
||||
private byte[] regionName;
|
||||
|
||||
public ExecRPCInvoker(Configuration conf,
|
||||
HConnection connection,
|
||||
Class<? extends CoprocessorProtocol> protocol,
|
||||
byte[] table,
|
||||
byte[] row) {
|
||||
this.conf = conf;
|
||||
this.connection = connection;
|
||||
this.protocol = protocol;
|
||||
this.table = table;
|
||||
this.row = row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object invoke(Object instance, final Method method, final Object[] args)
|
||||
throws Throwable {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Call: "+method.getName()+", "+(args != null ? args.length : 0));
|
||||
}
|
||||
|
||||
if (row != null) {
|
||||
final Exec exec = new Exec(conf, row, protocol, method, args);
|
||||
ServerCallable<ExecResult> callable =
|
||||
new ServerCallable<ExecResult>(connection, table, row) {
|
||||
public ExecResult call() throws Exception {
|
||||
byte[] regionName = location.getRegionInfo().getRegionName();
|
||||
return ProtobufUtil.execCoprocessor(server, exec, regionName);
|
||||
}
|
||||
};
|
||||
ExecResult result = callable.withRetries();
|
||||
this.regionName = result.getRegionName();
|
||||
LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
|
||||
", value="+result.getValue());
|
||||
return result.getValue();
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Null row passed for call");
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public byte[] getRegionName() {
|
||||
return regionName;
|
||||
}
|
||||
}
|
|
@ -31,7 +31,6 @@ import java.lang.reflect.Method;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
@ -39,11 +38,8 @@ import java.util.NavigableMap;
|
|||
import java.util.NavigableSet;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DeserializationException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
|
@ -66,14 +62,11 @@ import org.apache.hadoop.hbase.client.Row;
|
|||
import org.apache.hadoop.hbase.client.RowLock;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||
|
@ -102,8 +95,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
||||
|
@ -115,7 +106,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
|
||||
|
@ -133,23 +123,23 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Methods;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hbase.Cell;
|
||||
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
/**
|
||||
* Protobufs utility.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public final class ProtobufUtil {
|
||||
|
||||
private ProtobufUtil() {
|
||||
|
@ -234,53 +224,6 @@ public final class ProtobufUtil {
|
|||
return e instanceof IOException ? (IOException) e : new IOException(se);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a protocol buffer Exec to a client Exec
|
||||
*
|
||||
* @param proto the protocol buffer Exec to convert
|
||||
* @return the converted client Exec
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Deprecated
|
||||
public static Exec toExec(
|
||||
final ClientProtos.Exec proto) throws IOException {
|
||||
byte[] row = proto.getRow().toByteArray();
|
||||
String protocolName = proto.getProtocolName();
|
||||
String methodName = proto.getMethodName();
|
||||
List<Object> parameters = new ArrayList<Object>();
|
||||
Class<? extends CoprocessorProtocol> protocol = null;
|
||||
Method method = null;
|
||||
try {
|
||||
List<Class<?>> types = new ArrayList<Class<?>>();
|
||||
for (NameBytesPair parameter: proto.getParameterList()) {
|
||||
String type = parameter.getName();
|
||||
Class<?> declaredClass = PRIMITIVES.get(type);
|
||||
if (declaredClass == null) {
|
||||
declaredClass = Class.forName(parameter.getName());
|
||||
}
|
||||
parameters.add(toObject(parameter));
|
||||
types.add(declaredClass);
|
||||
}
|
||||
Class<?> [] parameterTypes = new Class<?> [types.size()];
|
||||
types.toArray(parameterTypes);
|
||||
protocol = (Class<? extends CoprocessorProtocol>)
|
||||
Class.forName(protocolName);
|
||||
method = protocol.getMethod(methodName, parameterTypes);
|
||||
} catch (NoSuchMethodException nsme) {
|
||||
throw new IOException(nsme);
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new IOException(cnfe);
|
||||
}
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
for (NameStringPair p: proto.getPropertyList()) {
|
||||
conf.set(p.getName(), p.getValue());
|
||||
}
|
||||
Object[] parameterObjects = new Object[parameters.size()];
|
||||
parameters.toArray(parameterObjects);
|
||||
return new Exec(conf, row, protocol,
|
||||
method, parameterObjects);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a ServerName to a protocol buffer ServerName
|
||||
*
|
||||
|
@ -748,43 +691,6 @@ public final class ProtobufUtil {
|
|||
return scan;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a new protocol buffer Exec based on a client Exec
|
||||
*
|
||||
* @param exec
|
||||
* @return a ClientProtos.Exec
|
||||
* @throws IOException
|
||||
*/
|
||||
public static ClientProtos.Exec toExec(
|
||||
final Exec exec) throws IOException {
|
||||
ClientProtos.Exec.Builder
|
||||
builder = ClientProtos.Exec.newBuilder();
|
||||
Configuration conf = exec.getConf();
|
||||
if (conf != null) {
|
||||
NameStringPair.Builder propertyBuilder = NameStringPair.newBuilder();
|
||||
Iterator<Entry<String, String>> iterator = conf.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Entry<String, String> entry = iterator.next();
|
||||
propertyBuilder.setName(entry.getKey());
|
||||
propertyBuilder.setValue(entry.getValue());
|
||||
builder.addProperty(propertyBuilder.build());
|
||||
}
|
||||
}
|
||||
builder.setProtocolName(exec.getProtocolName());
|
||||
builder.setMethodName(exec.getMethodName());
|
||||
builder.setRow(ByteString.copyFrom(exec.getRow()));
|
||||
Object[] parameters = exec.getParameters();
|
||||
if (parameters != null && parameters.length > 0) {
|
||||
Class<?>[] declaredClasses = exec.getParameterClasses();
|
||||
for (int i = 0, n = parameters.length; i < n; i++) {
|
||||
builder.addParameter(
|
||||
ProtobufUtil.toParameter(declaredClasses[i], parameters[i]));
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer Get based on a client Get.
|
||||
*
|
||||
|
@ -1318,29 +1224,6 @@ public final class ProtobufUtil {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper to exec a coprocessor Exec using client protocol.
|
||||
*
|
||||
* @param client
|
||||
* @param exec
|
||||
* @param regionName
|
||||
* @return the exec result
|
||||
* @throws IOException
|
||||
*/
|
||||
public static ExecResult execCoprocessor(final ClientProtocol client,
|
||||
final Exec exec, final byte[] regionName) throws IOException {
|
||||
ExecCoprocessorRequest request =
|
||||
RequestConverter.buildExecCoprocessorRequest(regionName, exec);
|
||||
try {
|
||||
ExecCoprocessorResponse response =
|
||||
client.execCoprocessor(null, request);
|
||||
Object value = ProtobufUtil.toObject(response.getValue());
|
||||
return new ExecResult(regionName, value);
|
||||
} catch (ServiceException se) {
|
||||
throw getRemoteException(se);
|
||||
}
|
||||
}
|
||||
|
||||
public static CoprocessorServiceResponse execService(final ClientProtocol client,
|
||||
final CoprocessorServiceCall call, final byte[] regionName) throws IOException {
|
||||
CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder()
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
||||
|
@ -67,7 +66,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
|
||||
|
@ -117,7 +115,6 @@ import com.google.protobuf.ByteString;
|
|||
* or build components for protocol buffer requests.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@SuppressWarnings("deprecation")
|
||||
public final class RequestConverter {
|
||||
|
||||
private RequestConverter() {
|
||||
|
@ -496,24 +493,6 @@ public final class RequestConverter {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer coprocessor exec request
|
||||
*
|
||||
* @param regionName
|
||||
* @param exec
|
||||
* @return a coprocessor exec request
|
||||
* @throws IOException
|
||||
*/
|
||||
public static ExecCoprocessorRequest buildExecCoprocessorRequest(
|
||||
final byte[] regionName, final Exec exec) throws IOException {
|
||||
ExecCoprocessorRequest.Builder builder = ExecCoprocessorRequest.newBuilder();
|
||||
RegionSpecifier region = buildRegionSpecifier(
|
||||
RegionSpecifierType.REGION_NAME, regionName);
|
||||
builder.setRegion(region);
|
||||
builder.setCall(ProtobufUtil.toExec(exec));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer multi request for a list of actions.
|
||||
* RowMutations in the list (if any) will be ignored.
|
||||
|
@ -539,8 +518,6 @@ public final class RequestConverter {
|
|||
protoAction.setMutate(ProtobufUtil.toMutate(MutateType.PUT, (Put)row));
|
||||
} else if (row instanceof Delete) {
|
||||
protoAction.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, (Delete)row));
|
||||
} else if (row instanceof Exec) {
|
||||
protoAction.setExec(ProtobufUtil.toExec((Exec)row));
|
||||
} else if (row instanceof Append) {
|
||||
protoAction.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, (Append)row));
|
||||
} else if (row instanceof Increment) {
|
||||
|
|
|
@ -24,8 +24,6 @@ import java.io.IOException;
|
|||
import java.io.InterruptedIOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.text.ParseException;
|
||||
import java.util.AbstractList;
|
||||
import java.util.ArrayList;
|
||||
|
@ -62,7 +60,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import com.google.protobuf.*;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -100,23 +97,21 @@ import org.apache.hadoop.hbase.client.Row;
|
|||
import org.apache.hadoop.hbase.client.RowLock;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterWrapper;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPC;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCallContext;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
|
@ -138,14 +133,15 @@ import org.apache.hadoop.util.StringUtils;
|
|||
import org.cliffc.high_scale_lib.Counter;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ClassToInstanceMap;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.MutableClassToInstanceMap;
|
||||
import com.google.common.io.Closeables;
|
||||
|
||||
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* HRegion stores data for a certain region of a table. It stores all columns
|
||||
|
@ -212,13 +208,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
|
||||
Bytes.BYTES_RAWCOMPARATOR);
|
||||
|
||||
// Registered region protocol handlers
|
||||
private ClassToInstanceMap<CoprocessorProtocol>
|
||||
protocolHandlers = MutableClassToInstanceMap.create();
|
||||
|
||||
private Map<String, Class<? extends CoprocessorProtocol>>
|
||||
protocolHandlerNames = Maps.newHashMap();
|
||||
|
||||
// TODO: account for each registered handler in HeapSize computation
|
||||
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
|
||||
|
||||
|
@ -2093,10 +2082,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
Set<byte[]> deletesCfSet = null;
|
||||
|
||||
WALEdit walEdit = new WALEdit();
|
||||
|
||||
long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
|
||||
|
||||
|
||||
MultiVersionConsistencyControl.WriteEntry w = null;
|
||||
long txid = 0;
|
||||
boolean walSyncSuccessful = false;
|
||||
|
@ -2365,7 +2350,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
if (noOfPuts > 0) {
|
||||
// There were some Puts in the batch.
|
||||
double noOfMutations = noOfPuts + noOfDeletes;
|
||||
if (this.metricsRegion != null) {
|
||||
this.metricsRegion.updatePut();
|
||||
}
|
||||
|
@ -4899,7 +4883,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT +
|
||||
ClassSize.ARRAY +
|
||||
41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
(9 * Bytes.SIZEOF_LONG) +
|
||||
Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
|
@ -4940,47 +4924,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
System.exit(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a new CoprocessorProtocol subclass and instance to
|
||||
* be available for handling {@link HRegion#exec(Exec)} calls.
|
||||
*
|
||||
* <p>
|
||||
* Only a single protocol type/handler combination may be registered per
|
||||
* region.
|
||||
* After the first registration, subsequent calls with the same protocol type
|
||||
* will fail with a return value of {@code false}.
|
||||
* </p>
|
||||
* @param protocol a {@code CoprocessorProtocol} subinterface defining the
|
||||
* protocol methods
|
||||
* @param handler an instance implementing the interface
|
||||
* @param <T> the protocol type
|
||||
* @return {@code true} if the registration was successful, {@code false}
|
||||
* otherwise
|
||||
*/
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol> boolean registerProtocol(
|
||||
Class<T> protocol, T handler) {
|
||||
|
||||
/* No stacking of protocol handlers is currently allowed. The
|
||||
* first to claim wins!
|
||||
*/
|
||||
if (protocolHandlers.containsKey(protocol)) {
|
||||
LOG.error("Protocol "+protocol.getName()+
|
||||
" already registered, rejecting request from "+
|
||||
handler
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
protocolHandlers.putInstance(protocol, handler);
|
||||
protocolHandlerNames.put(protocol.getName(), protocol);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Registered protocol handler: region="+
|
||||
Bytes.toStringBinary(getRegionName())+" protocol="+protocol.getName());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
|
||||
* be available for handling
|
||||
|
@ -5016,73 +4959,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
|
||||
* method using the registered protocol handlers.
|
||||
* {@link CoprocessorProtocol} implementations must be registered via the
|
||||
* {@link org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)}
|
||||
* method before they are available.
|
||||
*
|
||||
* @param call an {@code Exec} instance identifying the protocol, method name,
|
||||
* and parameters for the method invocation
|
||||
* @return an {@code ExecResult} instance containing the region name of the
|
||||
* invocation and the return value
|
||||
* @throws IOException if no registered protocol handler is found or an error
|
||||
* occurs during the invocation
|
||||
* @see org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)
|
||||
*/
|
||||
@Deprecated
|
||||
public ExecResult exec(Exec call)
|
||||
throws IOException {
|
||||
Class<? extends CoprocessorProtocol> protocol = call.getProtocol();
|
||||
if (protocol == null) {
|
||||
String protocolName = call.getProtocolName();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Received dynamic protocol exec call with protocolName " + protocolName);
|
||||
}
|
||||
// detect the actual protocol class
|
||||
protocol = protocolHandlerNames.get(protocolName);
|
||||
if (protocol == null) {
|
||||
throw new HBaseRPC.UnknownProtocolException(null,
|
||||
"No matching handler for protocol "+protocolName+
|
||||
" in region "+Bytes.toStringBinary(getRegionName()));
|
||||
}
|
||||
}
|
||||
if (!protocolHandlers.containsKey(protocol)) {
|
||||
throw new HBaseRPC.UnknownProtocolException(protocol,
|
||||
"No matching handler for protocol "+protocol.getName()+
|
||||
" in region "+Bytes.toStringBinary(getRegionName()));
|
||||
}
|
||||
|
||||
CoprocessorProtocol handler = protocolHandlers.getInstance(protocol);
|
||||
Object value;
|
||||
|
||||
try {
|
||||
Method method = protocol.getMethod(
|
||||
call.getMethodName(), call.getParameterClasses());
|
||||
method.setAccessible(true);
|
||||
|
||||
value = method.invoke(handler, call.getParameters());
|
||||
} catch (InvocationTargetException e) {
|
||||
Throwable target = e.getTargetException();
|
||||
if (target instanceof IOException) {
|
||||
throw (IOException)target;
|
||||
}
|
||||
IOException ioe = new IOException(target.toString());
|
||||
ioe.setStackTrace(target.getStackTrace());
|
||||
throw ioe;
|
||||
} catch (Throwable e) {
|
||||
if (!(e instanceof IOException)) {
|
||||
LOG.error("Unexpected throwable object ", e);
|
||||
}
|
||||
IOException ioe = new IOException(e.toString());
|
||||
ioe.setStackTrace(e.getStackTrace());
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
return new ExecResult(getRegionName(), value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a single protocol buffer coprocessor endpoint {@link Service} method using
|
||||
* the registered protocol handlers. {@link Service} implementations must be registered via the
|
||||
|
|
|
@ -52,7 +52,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -73,7 +72,6 @@ import org.apache.hadoop.hbase.OutOfOrderScannerNextException;
|
|||
import org.apache.hadoop.hbase.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.ServerLoad;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
|
@ -96,18 +94,15 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPC;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
|
||||
import org.apache.hadoop.hbase.ipc.MetricsHBaseServer;
|
||||
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
|
@ -147,8 +142,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
|
||||
|
@ -186,8 +181,8 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
|
|||
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -218,12 +213,10 @@ import org.cliffc.high_scale_lib.Counter;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
|
||||
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
|
||||
|
||||
/**
|
||||
* HRegionServer makes a set of HRegions available to clients. It checks in with
|
||||
* the HMaster. There are many HRegionServers in a single HBase deployment.
|
||||
|
@ -3140,27 +3133,6 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a single method using protobuff.
|
||||
*/
|
||||
@Override
|
||||
public ExecCoprocessorResponse execCoprocessor(final RpcController controller,
|
||||
final ExecCoprocessorRequest request) throws ServiceException {
|
||||
try {
|
||||
requestCount.increment();
|
||||
HRegion region = getRegion(request.getRegion());
|
||||
ExecCoprocessorResponse.Builder
|
||||
builder = ExecCoprocessorResponse.newBuilder();
|
||||
ClientProtos.Exec call = request.getCall();
|
||||
Exec clientCall = ProtobufUtil.toExec(call);
|
||||
ExecResult result = region.exec(clientCall);
|
||||
builder.setValue(ProtobufUtil.toParameter(result.getValue()));
|
||||
return builder.build();
|
||||
} catch (IOException ie) {
|
||||
throw new ServiceException(ie);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CoprocessorServiceResponse execService(final RpcController controller,
|
||||
final CoprocessorServiceRequest request) throws ServiceException {
|
||||
|
@ -3255,9 +3227,6 @@ public class HRegionServer implements ClientProtocol,
|
|||
if (r != null) {
|
||||
result = ProtobufUtil.toResult(r);
|
||||
}
|
||||
} else if (actionUnion.hasExec()) {
|
||||
Exec call = ProtobufUtil.toExec(actionUnion.getExec());
|
||||
result = region.exec(call).getValue();
|
||||
} else {
|
||||
LOG.warn("Error: invalid action: " + actionUnion + ". "
|
||||
+ "it must be a Get, Mutate, or Exec.");
|
||||
|
|
|
@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
|||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -214,11 +213,6 @@ public class RegionCoprocessorHost
|
|||
// It uses a visitor pattern to invoke registered Endpoint
|
||||
// method.
|
||||
for (Class c : implClass.getInterfaces()) {
|
||||
if (CoprocessorProtocol.class.isAssignableFrom(c)) {
|
||||
region.registerProtocol(c, (CoprocessorProtocol)instance);
|
||||
}
|
||||
// we allow endpoints to register as both CoproocessorProtocols and Services
|
||||
// for ease of transition
|
||||
if (CoprocessorService.class.isAssignableFrom(c)) {
|
||||
region.registerService( ((CoprocessorService)instance).getService() );
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
|
@ -725,32 +724,6 @@ public class RemoteHTable implements HTableInterface {
|
|||
throw new IOException("batchCallback not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
|
||||
byte[] row) {
|
||||
throw new
|
||||
UnsupportedOperationException("coprocessorProxy not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
|
||||
Class<T> protocol, byte[] startKey, byte[] endKey,
|
||||
Batch.Call<T, R> callable)
|
||||
throws IOException, Throwable {
|
||||
throw new UnsupportedOperationException("coprocessorExec not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public <T extends CoprocessorProtocol, R> void coprocessorExec(
|
||||
Class<T> protocol, byte[] startKey, byte[] endKey,
|
||||
Batch.Call<T, R> callable, Batch.Callback<R> callback)
|
||||
throws IOException, Throwable {
|
||||
throw new UnsupportedOperationException("coprocessorExec not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CoprocessorRpcChannel coprocessorService(byte[] row) {
|
||||
throw new UnsupportedOperationException("coprocessorService not implemented");
|
||||
|
|
|
@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.BitComparator;
|
||||
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
|
||||
|
@ -536,7 +535,6 @@ public class TestHbaseObjectWritable extends TestCase {
|
|||
assertEquals(67,HbaseObjectWritable.getClassCode(MultiResponse.class).intValue());
|
||||
|
||||
// coprocessor execution
|
||||
assertEquals(68,HbaseObjectWritable.getClassCode(Exec.class).intValue());
|
||||
assertEquals(69,HbaseObjectWritable.getClassCode(Increment.class).intValue());
|
||||
|
||||
assertEquals(70,HbaseObjectWritable.getClassCode(KeyOnlyFilter.class).intValue());
|
||||
|
|
|
@ -65,8 +65,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
|
||||
|
@ -406,13 +404,6 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecCoprocessorResponse execCoprocessor(RpcController controller,
|
||||
ExecCoprocessorRequest request) throws ServiceException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientProtos.CoprocessorServiceResponse execService(RpcController controller,
|
||||
ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
|
||||
|
|
|
@ -29,8 +29,6 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
|
||||
|
|
Loading…
Reference in New Issue