HBASE-5448 Support dynamic coprocessor endpoints with protobuf based RPC

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1387001 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Helmling 2012-09-18 06:32:57 +00:00
parent 60449458da
commit fbfbad4e4d
36 changed files with 8819 additions and 297 deletions

View File

@ -2140,6 +2140,7 @@ public class HConnectionManager {
* @param <R> the callable's return type
* @throws IOException
*/
@Deprecated
public <T extends CoprocessorProtocol,R> void processExecs(
final Class<T> protocol,
List<byte[]> rows,

View File

@ -32,11 +32,17 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -56,6 +62,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.io.DataInputInputStream;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -1318,6 +1325,7 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
@Deprecated
public <T extends CoprocessorProtocol> T coprocessorProxy(
Class<T> protocol, byte[] row) {
return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(),
@ -1329,10 +1337,18 @@ public class HTable implements HTableInterface {
row));
}
/**
* {@inheritDoc}
*/
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return new CoprocessorRpcChannel(connection, tableName, row);
}
/**
* {@inheritDoc}
*/
@Override
@Deprecated
public <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
Class<T> protocol, byte[] startKey, byte[] endKey,
Batch.Call<T,R> callable)
@ -1353,6 +1369,7 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
@Deprecated
public <T extends CoprocessorProtocol, R> void coprocessorExec(
Class<T> protocol, byte[] startKey, byte[] endKey,
Batch.Call<T,R> callable, Batch.Callback<R> callback)
@ -1364,6 +1381,75 @@ public class HTable implements HTableInterface {
callback);
}
/**
* {@inheritDoc}
*/
@Override
public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
throws ServiceException, Throwable {
final Map<byte[],R> results = new ConcurrentSkipListMap<byte[], R>(Bytes.BYTES_COMPARATOR);
coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
public void update(byte[] region, byte[] row, R value) {
if (value == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Call to " + service.getName() +
" received NULL value from Batch.Call for region " + Bytes.toStringBinary(region));
}
} else {
results.put(region, value);
}
}
});
return results;
}
/**
* {@inheritDoc}
*/
@Override
public <T extends Service, R> void coprocessorService(final Class<T> service,
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
final Batch.Callback<R> callback) throws ServiceException, Throwable {
// get regions covered by the row range
List<byte[]> keys = getStartKeysInRange(startKey, endKey);
Map<byte[],Future<R>> futures =
new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
for (final byte[] r : keys) {
final CoprocessorRpcChannel channel =
new CoprocessorRpcChannel(connection, tableName, r);
Future<R> future = pool.submit(
new Callable<R>() {
public R call() throws Exception {
T instance = ProtobufUtil.newServiceStub(service, channel);
R result = callable.call(instance);
byte[] region = channel.getLastRegion();
if (callback != null) {
callback.update(region, r, result);
}
return result;
}
});
futures.put(r, future);
}
for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
try {
e.getValue().get();
} catch (ExecutionException ee) {
LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
+ Bytes.toStringBinary(e.getKey()), ee);
throw ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
+ " for row " + Bytes.toStringBinary(e.getKey()))
.initCause(ie);
}
}
}
private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
throws IOException {
Pair<byte[][],byte[][]> startEndKeys = getStartEndKeys();

View File

@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -30,6 +32,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
/**
* Used to communicate with a single HBase table.
@ -425,7 +428,9 @@ public interface HTableInterface extends Closeable {
* @param protocol The class or interface defining the remote protocol
* @param row The row key used to identify the remote region location
* @return A CoprocessorProtocol instance
* @deprecated since 0.96. Use {@link HTableInterface#coprocessorService(byte[])} instead.
*/
@Deprecated
<T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, byte[] row);
/**
@ -450,7 +455,11 @@ public interface HTableInterface extends Closeable {
* method
* @return a <code>Map</code> of region names to
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} return values
*
* @deprecated since 0.96. Use
* {@link HTableInterface#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)} instead.
*/
@Deprecated
<T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable)
throws IOException, Throwable;
@ -486,12 +495,96 @@ public interface HTableInterface extends Closeable {
* @param <R> Return type for the
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
* method
*
* @deprecated since 0.96.
* Use {@link HTableInterface#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)} instead.
*/
@Deprecated
<T extends CoprocessorProtocol, R> void coprocessorExec(
Class<T> protocol, byte[] startKey, byte[] endKey,
Batch.Call<T,R> callable, Batch.Callback<R> callback)
throws IOException, Throwable;
/**
* Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the
* table region containing the specified row. The row given does not actually have
* to exist. Whichever region would contain the row based on start and end keys will
* be used. Note that the {@code row} parameter is also not passed to the
* coprocessor handler registered for this protocol, unless the {@code row}
* is separately passed as an argument in the service request. The parameter
* here is only used to locate the region used to handle the call.
*
* <p>
* The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published
* coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations:
* </p>
*
* <div style="background-color: #cccccc; padding: 2px">
* <blockquote><pre>
* CoprocessorRpcChannel channel = myTable.coprocessorService(rowkey);
* MyService.BlockingInterface service = MyService.newBlockingStub(channel);
* MyCallRequest request = MyCallRequest.newBuilder()
* ...
* .build();
* MyCallResponse response = service.myCall(null, request);
* </pre></blockquote></div>
*
* @param row The row key used to identify the remote region location
* @return A CoprocessorRpcChannel instance
*/
CoprocessorRpcChannel coprocessorService(byte[] row);
/**
* Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive),
* and invokes the passed {@link Batch.Call#call(Object)} method with each {@link Service}
* instance.
*
* @param service the protocol buffer {@code Service} implementation to call
* @param startKey start region selection with region containing this row. If {@code null}, the
* selection will start with the first table region.
* @param endKey select regions up to and including the region containing this row.
* If {@code null}, selection will continue through the last table region.
* @param callable this instance's {@link Batch.Call#call(Object)} method will be invoked once
* per table region, using the {@link Service} instance connected to that region.
* @param <T> the {@link Service} subclass to connect to
* @param <R> Return type for the {@code callable} parameter's
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} method
* @return a map of result values keyed by region name
*/
<T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
throws ServiceException, Throwable;
/**
* Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive),
* and invokes the passed {@link Batch.Call#call(Object)} method with each {@link Service}
* instance.
*
* <p>
* The given
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
* method will be called with the return value from each region's {@link Batch.Call#call(Object)}
* invocation.
*</p>
*
* @param service the protocol buffer {@code Service} implementation to call
* @param startKey start region selection with region containing this row. If {@code null}, the
* selection will start with the first table region.
* @param endKey select regions up to and including the region containing this row.
* If {@code null}, selection will continue through the last table region.
* @param callable this instance's {@link Batch.Call#call(Object)} method will be invoked once
* per table region, using the {@link Service} instance connected to that region.
* @param callback
* @param <T> the {@link Service} subclass to connect to
* @param <R> Return type for the {@code callable} parameter's
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} method
*/
<T extends Service, R> void coprocessorService(final Class<T> service,
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
final Batch.Callback<R> callback) throws ServiceException, Throwable;
/**
* See {@link #setAutoFlush(boolean, boolean)}
*

View File

@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
@ -492,6 +495,25 @@ public class HTablePool implements Closeable {
table.coprocessorExec(protocol, startKey, endKey, callable, callback);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return table.coprocessorService(row);
}
@Override
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
throws ServiceException, Throwable {
return table.coprocessorService(service, startKey, endKey, callable);
}
@Override
public <T extends Service, R> void coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
throws ServiceException, Throwable {
table.coprocessorService(service, startKey, endKey, callable, callback);
}
@Override
public String toString() {
return "PooledHTable{" + ", table=" + table + '}';

View File

@ -67,6 +67,7 @@ public abstract class Batch {
* @see Batch#forMethod(java.lang.reflect.Method, Object...)
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
*/
@Deprecated
public static <T extends CoprocessorProtocol,R> Call<T,R> forMethod(
final Class<T> protocol, final String method, final Object... args)
throws NoSuchMethodException {
@ -100,6 +101,7 @@ public abstract class Batch {
* return the results
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
*/
@Deprecated
public static <T extends CoprocessorProtocol,R> Call<T,R> forMethod(
final Method method, final Object... args) {
return new Call<T,R>() {

View File

@ -51,7 +51,10 @@ import java.lang.reflect.Method;
* @see ExecResult
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
* @deprecated since 0.96.0. See {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])}
* or related methods instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Exec extends Invocation implements Row {

View File

@ -46,7 +46,10 @@ import java.io.Serializable;
* @see Exec
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
* @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
* @deprecated since 0.96.0. See {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])}
* or related methods instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ExecResult implements Writable {

View File

@ -37,30 +37,42 @@ protocols.
<p>
In order to provide a custom RPC protocol to clients, a coprocessor implementation
defines an interface that extends {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}.
The interface can define any methods that the coprocessor wishes to expose.
Using this protocol, you can communicate with the coprocessor instances via
the {@link org.apache.hadoop.hbase.client.HTable#coprocessorProxy(Class, byte[])} and
{@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
must:
<ul>
<li>Define a protocol buffer Service and supporting Message types for the RPC methods.
See the
<a href="https://developers.google.com/protocol-buffers/docs/proto#services">protocol buffer guide</a>
for more details on defining services.</li>
<li>Generate the Service and Message code using the protoc compiler</li>
<li>Implement the generated Service interface in your coprocessor class and implement the
{@link org.apache.hadoop.hbase.coprocessor.CoprocessorService} interface. The
{@link org.apache.hadoop.hbase.coprocessor.CoprocessorService#getService()}
method should return a reference to the Endpoint's protocol buffer Service instance.
</ul>
Clients may then call the defined service methods on coprocessor instances via
the {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])},
{@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}, and
{@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
methods.
</p>
<p>
Since {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol} instances are
associated with individual regions within the table, the client RPC calls
must ultimately identify which regions should be used in the <code>CoprocessorProtocol</code>
Since coprocessor Service instances are associated with individual regions within the table,
the client RPC calls must ultimately identify which regions should be used in the Service
method invocations. Since regions are seldom handled directly in client code
and the region names may change over time, the coprocessor RPC calls use row keys
to identify which regions should be used for the method invocations. Clients
can call <code>CoprocessorProtocol</code> methods against either:
can call coprocessor Service methods against either:
<ul>
<li><strong>a single region</strong> - calling
{@link org.apache.hadoop.hbase.client.HTable#coprocessorProxy(Class, byte[])}
with a single row key. This returns a dynamic proxy of the <code>CoprocessorProtocol</code>
interface which uses the region containing the given row key (even if the
row does not exist) as the RPC endpoint.</li>
{@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])}
with a single row key. This returns a {@link org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel}
instance which communicates with the region containing the given row key (even if the
row does not exist) as the RPC endpoint. Clients can then use the {@code CoprocessorRpcChannel}
instance in creating a new Service stub to call RPC methods on the region's coprocessor.</li>
<li><strong>a range of regions</strong> - calling
{@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
{@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}
or {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
with a starting row key and an ending row key. All regions in the table
from the region containing the start row key to the region containing the end
row key (inclusive), will we used as the RPC endpoints.</li>
@ -68,17 +80,16 @@ can call <code>CoprocessorProtocol</code> methods against either:
</p>
<p><em>Note that the row keys passed as parameters to the <code>HTable</code>
methods are not passed to the <code>CoprocessorProtocol</code> implementations.
methods are not passed directly to the coprocessor Service implementations.
They are only used to identify the regions for endpoints of the remote calls.
</em></p>
<p>
The {@link org.apache.hadoop.hbase.client.coprocessor.Batch} class defines two
interfaces used for <code>CoprocessorProtocol</code> invocations against
multiple regions. Clients implement {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} to
call methods of the actual <code>CoprocessorProtocol</code> instance. The interface's
<code>call()</code> method will be called once per selected region, passing the
<code>CoprocessorProtocol</code> instance for the region as a parameter. Clients
interfaces used for coprocessor Service invocations against multiple regions. Clients implement
{@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} to call methods of the actual
coprocessor Service instance. The interface's <code>call()</code> method will be called once
per selected region, passing the Service instance for the region as a parameter. Clients
can optionally implement {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback}
to be notified of the results from each region invocation as they complete.
The instance's {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
@ -88,112 +99,128 @@ return value from each region.
<h2><a name="usage">Example usage</a></h2>
<p>
To start with, let's use a fictitious coprocessor, <code>RowCountCoprocessor</code>
To start with, let's use a fictitious coprocessor, <code>RowCountEndpoint</code>
that counts the number of rows and key-values in each region where it is running.
For clients to query this information, the coprocessor defines and implements
the following {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol} extension
interface:
For clients to query this information, the coprocessor defines the following protocol buffer
service:
</p>
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
public interface RowCountProtocol extends CoprocessorProtocol {
long getRowCount();
long getRowCount(Filter filt);
long getKeyValueCount();
message CountRequest {
}
message CountResponse {
required int64 count = 1 [default = 0];
}
service RowCountService {
rpc getRowCount(CountRequest)
returns (CountResponse);
rpc getKeyValueCount(CountRequest)
returns (CountResponse);
}
</pre></blockquote></div>
<p>
Now we need a way to access the results that <code>RowCountCoprocessor</code>
Next run the protoc compiler on the .proto file to generate Java code for the Service interface.
The generated {@code RowCountService} interface should look something like:
</p>
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
public static abstract class RowCountService
implements com.google.protobuf.Service {
...
public interface Interface {
public abstract void getRowCount(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountResponse> done);
public abstract void getKeyValueCount(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos.CountResponse> done);
}
}
</pre></blockquote></div>
<p>
Our coprocessor Service will need to implement this interface and the {@link org.apache.hadoop.hbase.coprocessor.CoprocessorService}
in order to be registered correctly as an endpoint. For the sake of simplicity the server-side
implementation is omitted. To see the implementing code, please see the
{@link org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint} class in the HBase source code.
</p>
<p>
Now we need a way to access the results that <code>RowCountService</code>
is making available. If we want to find the row count for all regions, we could
use:
</p>
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
HTable table = new HTable("mytable");
// find row count keyed by region name
Map<byte[],Long> results = table.coprocessorExec(
RowCountProtocol.class, // the protocol interface we're invoking
null, null, // start and end row keys
new Batch.Call<RowCountProtocol,Long>() {
public Long call(RowCountProtocol counter) {
return counter.getRowCount();
}
});
HTable table = new HTable(conf, "mytable");
final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
Map<byte[],Long> results = table.coprocessorService(
ExampleProtos.RowCountService.class, // the protocol interface we're invoking
null, null, // start and end row keys
new Batch.Call<ExampleProtos.RowCountService,Long>() {
public Long call(ExampleProtos.RowCountService counter) throws IOException {
BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback =
new BlockingRpcCallback<ExampleProtos.CountResponse>();
counter.getRowCount(null, request, rpcCallback);
ExampleProtos.CountResponse response = rpcCallback.get();
return response.hasCount() ? response.getCount() : 0;
}
});
</pre></blockquote></div>
<p>
This will return a <code>java.util.Map</code> of the <code>counter.getRowCount()</code>
result for the <code>RowCountCoprocessor</code> instance running in each region
result for the <code>RowCountService</code> instance running in each region
of <code>mytable</code>, keyed by the region name.
</p>
<p>
By implementing {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
as an anonymous class, we can invoke <code>RowCountProtocol</code> methods
as an anonymous class, we can invoke <code>RowCountService</code> methods
directly against the {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
method's argument. Calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}
method's argument. Calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}
will take care of invoking <code>Batch.Call.call()</code> against our anonymous class
with the <code>RowCountCoprocessor</code> instance for each table region.
with the <code>RowCountService</code> instance for each table region.
</p>
<p>
For this simple case, where we only want to obtain the result from a single
<code>CoprocessorProtocol</code> method, there's also a bit of syntactic sugar
we can use to cut down on the amount of code required:
</p>
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
HTable table = new HTable("mytable");
Batch.Call<RowCountProtocol,Long> call = Batch.forMethod(RowCountProtocol.class, "getRowCount");
Map<byte[],Long> results = table.coprocessorExec(RowCountProtocol.class, null, null, call);
</pre></blockquote></div>
<p>
{@link org.apache.hadoop.hbase.client.coprocessor.Batch#forMethod(Class, String, Object...)}
is a simple factory method that will return a {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
instance that will call <code>RowCountProtocol.getRowCount()</code> for us
using reflection.
</p>
<p>
However, if you want to perform additional processing on the results,
implementing {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
directly will provide more power and flexibility. For example, if you would
Implementing {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} also allows you to
perform additional processing against each region's Service instance. For example, if you would
like to combine row count and key-value count for each region:
</p>
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
HTable table = new HTable("mytable");
HTable table = new HTable(conf, "mytable");
// combine row count and kv count for region
Map<byte[],Pair<Long,Long>> results = table.coprocessorExec(
RowCountProtocol.class,
null, null,
new Batch.Call<RowCountProtocol,Pair<Long,Long>>() {
public Pair<Long,Long> call(RowCountProtocol counter) {
return new Pair(counter.getRowCount(), counter.getKeyValueCount());
}
});
</pre></blockquote></div>
final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
Map<byte[],Long> results = table.coprocessorService(
ExampleProtos.RowCountService.class, // the protocol interface we're invoking
null, null, // start and end row keys
new Batch.Call<ExampleProtos.RowCountService,Pair<Long,Long>>() {
public Long call(ExampleProtos.RowCountService counter) throws IOException {
BlockingRpcCallback<ExampleProtos.CountResponse> rowCallback =
new BlockingRpcCallback<ExampleProtos.CountResponse>();
counter.getRowCount(null, request, rowCallback);
<p>
Similarly, you could average the number of key-values per row for each region:
</p>
BlockingRpcCallback<ExampleProtos.CountResponse> kvCallback =
new BlockingRpcCallback<ExampleProtos.CountResponse>();
counter.getKeyValueCount(null, request, kvCallback);
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
Map<byte[],Double> results = table.coprocessorExec(
RowCountProtocol.class,
null, null,
new Batch.Call<RowCountProtocol,Double>() {
public Double call(RowCountProtocol counter) {
return ((double)counter.getKeyValueCount()) / ((double)counter.getRowCount());
}
});
ExampleProtos.CountResponse rowResponse = rowCallback.get();
ExampleProtos.CountResponse kvResponse = kvCallback.get();
return new Pair(rowResponse.hasCount() ? rowResponse.getCount() : 0,
kvResponse.hasCount() ? kvResponse.getCount() : 0);
}
});
</pre></blockquote></div>
*/
package org.apache.hadoop.hbase.client.coprocessor;

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.coprocessor;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -34,6 +36,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
import org.apache.hadoop.hbase.util.VersionInfo;
@ -250,6 +253,11 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
*/
public E loadInstance(Class<?> implClass, int priority, Configuration conf)
throws IOException {
if (!Coprocessor.class.isAssignableFrom(implClass)) {
throw new IOException("Configured class " + implClass.getName() + " must implement "
+ Coprocessor.class.getName() + " interface ");
}
// create the instance
Coprocessor impl;
Object o = null;
@ -435,7 +443,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
byte[] qualifier, long amount, boolean writeToWAL)
throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount,
writeToWAL);
writeToWAL);
}
@Override
@ -536,6 +544,25 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
return table.coprocessorProxy(protocol, row);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return table.coprocessorService(row);
}
@Override
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
throws ServiceException, Throwable {
return table.coprocessorService(service, startKey, endKey, callable);
}
@Override
public <T extends Service, R> void coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
throws ServiceException, Throwable {
table.coprocessorService(service, startKey, endKey, callable, callback);
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
table.mutateRow(rm);

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import com.google.protobuf.Service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Coprocessor endpoints providing protobuf services should implement this
* interface and return the {@link Service} instance via {@link #getService()}.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface CoprocessorService {
public Service getService();
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor.example;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Sample coprocessor endpoint exposing a Service interface for counting rows and key values.
*
* <p>
* For the protocol buffer definition of the RowCountService, see the source file located under
* hbase-server/src/main/protobuf/Examples.proto.
* </p>
*/
public class RowCountEndpoint extends ExampleProtos.RowCountService
implements Coprocessor, CoprocessorService {
private RegionCoprocessorEnvironment env;
public RowCountEndpoint() {
}
/**
* Just returns a reference to this object, which implements the RowCounterService interface.
*/
@Override
public Service getService() {
return this;
}
/**
* Returns a count of the rows in the region where this coprocessor is loaded.
*/
@Override
public void getRowCount(RpcController controller, ExampleProtos.CountRequest request,
RpcCallback<ExampleProtos.CountResponse> done) {
Scan scan = new Scan();
scan.setFilter(new FirstKeyOnlyFilter());
ExampleProtos.CountResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMore = false;
byte[] lastRow = null;
long count = 0;
do {
hasMore = scanner.next(results);
for (KeyValue kv : results) {
byte[] currentRow = kv.getRow();
if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
lastRow = currentRow;
count++;
}
}
results.clear();
} while (hasMore);
response = ExampleProtos.CountResponse.newBuilder()
.setCount(count).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
/**
* Returns a count of all KeyValues in the region where this coprocessor is loaded.
*/
@Override
public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request,
RpcCallback<ExampleProtos.CountResponse> done) {
ExampleProtos.CountResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(new Scan());
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMore = false;
long count = 0;
do {
hasMore = scanner.next(results);
for (KeyValue kv : results) {
count++;
}
results.clear();
} while (hasMore);
response = ExampleProtos.CountResponse.newBuilder()
.setCount(count).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
/**
* Stores a reference to the coprocessor environment provided by the
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
* coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
* on a table region, so always expects this to be an instance of
* {@link RegionCoprocessorEnvironment}.
* @param env the environment provided by the coprocessor host
* @throws IOException if the provided environment is not an instance of
* {@code RegionCoprocessorEnvironment}
*/
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
}
}

View File

@ -171,117 +171,37 @@ public class AccessControlCoprocessor extends BaseRegionObserverCoprocessor {
<h2><a name="commandtarget">Endpoint</a></h2>
<code>Coprocessor</code> and <code>RegionObserver</code> provide certain hooks
for injecting user code running at each region. The user code will be triggerd
for injecting user code running at each region. The user code will be triggered
by existing <code>HTable</code> and <code>HBaseAdmin</code> operations at
the certain hook points.
<p>
Through Endpoint and dynamic RPC protocol, you can define your own
interface communicated between client and region server,
i.e., you can create a new method, specify passed parameters and return types
for this new method.
And the new Endpoint methods can be triggered by
calling client side dynamic RPC functions -- <code>HTable.coprocessorExec(...)
Coprocessor Endpoints allow you to define your own dynamic RPC protocol to communicate
between clients and region servers, i.e., you can create a new method, specifying custom
request parameters and return types. RPC methods exposed by coprocessor Endpoints can be
triggered by calling client side dynamic RPC functions -- <code>HTable.coprocessorService(...)
</code>.
<p>
To implement a Endpoint, you need to:
To implement an Endpoint, you need to:
<ul>
<li>Extend <code>CoprocessorProtocol</code>: the interface defines
communication protocol for the new Endpoint, and will be
served as the RPC protocol between client and region server.</li>
<li>Extend both <code>BaseEndpointCoprocessor</code> abstract class,
and the above extended <code>CoprocessorProtocol</code> interface:
the actually implemented class running at region server.</li>
<li>Define a protocol buffer Service and supporting Message types for the RPC methods.
See the
<a href="https://developers.google.com/protocol-buffers/docs/proto#services">protocol buffer guide</a>
for more details on defining services.</li>
<li>Generate the Service and Message code using the protoc compiler</li>
<li>Implement the generated Service interface in your coprocessor class and implement the
<code>CoprocessorService</code> interface. The <code>CoprocessorService.getService()</code>
method should return a reference to the Endpoint's protocol buffer Service instance.
</ul>
<p>
Here's an example of performing column aggregation at region server:
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
// A sample protocol for performing aggregation at regions.
public static interface ColumnAggregationProtocol
extends CoprocessorProtocol {
// Perform aggregation for a given column at the region. The aggregation
// will include all the rows inside the region. It can be extended to
// allow passing start and end rows for a fine-grained aggregation.
public int sum(byte[] family, byte[] qualifier) throws IOException;
}
// Aggregation implementation at a region.
public static class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
implements ColumnAggregationProtocol {
// @Override
// Scan the region by the given family and qualifier. Return the aggregation
// result.
public int sum(byte[] family, byte[] qualifier)
throws IOException {
// aggregate at each region
Scan scan = new Scan();
scan.addColumn(family, qualifier);
int sumResult = 0;
// use an internal scanner to perform scanning.
InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);
try {
List&lt;KeyValue&gt; curVals = new ArrayList&lt;KeyValue&gt;();
boolean done = false;
do {
curVals.clear();
done = scanner.next(curVals);
KeyValue kv = curVals.get(0);
sumResult += Bytes.toInt(kv.getValue());
} while (done);
} finally {
scanner.close();
}
return sumResult;
}
}
</pre></blockquote>
</div>
<p>
Client invocations are performed through <code>HTable</code>,
which has the following methods added by dynamic RPC protocol:
For a more detailed discussion of how to implement a coprocessor Endpoint, along with some sample
code, see the {@link org.apache.hadoop.hbase.client.coprocessor} package documentation.
</p>
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
public &lt;T extends CoprocessorProtocol&gt; T coprocessorProxy(Class&lt;T&gt; protocol, Row row)
public &lt;T extends CoprocessorProtocol, R&gt; void coprocessorExec(
Class&lt;T&gt; protocol, List&lt;? extends Row&gt; rows,
BatchCall&lt;T,R&gt; callable, BatchCallback&lt;R&gt; callback)
public &lt;T extends CoprocessorProtocol, R&gt; void coprocessorExec(
Class&lt;T&gt; protocol, RowRange range,
BatchCall&lt;T,R&gt; callable, BatchCallback&lt;R&gt; callback)
</pre></blockquote>
</div>
<p>
Here is a client side example of invoking
<code>ColumnAggregationEndpoint</code>:
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Scan scan;
Map&lt;byte[], Integer&gt; results;
// scan: for all regions
scan = new Scan();
results = table.coprocessorExec(ColumnAggregationProtocol.class, scan,
new BatchCall&lt;ColumnAggregationProtocol,Integer&gt;() {
public Integer call(ColumnAggregationProtocol instance) throws IOException{
return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
}
});
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry&lt;byte[], Integer&gt; e : results.entrySet()) {
sumResult += e.getValue();
}
</pre></blockquote>
</div>
<h2><a name="load">Coprocess loading</a></h2>
<h2><a name="load">Coprocessor loading</a></h2>
A customized coprocessor can be loaded by two different ways, by configuration,
or by <code>HTableDescriptor</code> for a newly created table.
<p>
(Currently we don't really have an on demand coprocessor loading machanism for
(Currently we don't really have an on demand coprocessor loading mechanism for
opened regions.)
<h3>Load from configuration</h3>
Whenever a region is opened, it will read coprocessor class names from
@ -294,7 +214,7 @@ default coprocessors. The classes must be included in the classpath already.
<blockquote><pre>
&lt;property&gt;
&lt;name&gt;hbase.coprocessor.region.classes&lt;/name&gt;
&lt;value&gt;org.apache.hadoop.hbase.coprocessor.AccessControllCoprocessor, org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol&lt;/value&gt;
&lt;value&gt;org.apache.hadoop.hbase.coprocessor.AccessControlCoprocessor, org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol&lt;/value&gt;
&lt;description&gt;A comma-separated list of Coprocessors that are loaded by
default. For any override coprocessor method from RegionObservor or
Coprocessor, these classes' implementation will be called

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.RpcCallback;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
import java.io.InterruptedIOException;
/**
* Simple {@link RpcCallback} implementation providing a
* {@link java.util.concurrent.Future}-like {@link BlockingRpcCallback#get()} method, which
* will block util the instance's {@link BlockingRpcCallback#run(Object)} method has been called.
* {@code R} is the RPC response type that will be passed to the {@link #run(Object)} method.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class BlockingRpcCallback<R> implements RpcCallback<R> {
private R result;
private boolean resultSet = false;
/**
* Called on completion of the RPC call with the response object, or {@code null} in the case of
* an error.
* @param parameter the response object or {@code null} if an error occurred
*/
@Override
public void run(R parameter) {
synchronized (this) {
result = parameter;
resultSet = true;
this.notify();
}
}
/**
* Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was
* passed. When used asynchronously, this method will block until the {@link #run(Object)}
* method has been called.
* @return the response object or {@code null} if no response was passed
*/
public synchronized R get() throws IOException {
while (!resultSet) {
try {
this.wait();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
InterruptedIOException exception = new InterruptedIOException(ie.getMessage());
exception.initCause(ie);
throw exception;
}
}
return result;
}
}

View File

@ -35,9 +35,12 @@ import org.apache.hadoop.classification.InterfaceStability;
* <li>an array or {@code java.util.List} of one of the above</li>
* </ul>
* </p>
* @deprecated since 0.96. Use {@link org.apache.hadoop.hbase.coprocessor.CoprocessorService}
* instead.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@Deprecated
public interface CoprocessorProtocol extends VersionedProtocol {
public static final long VERSION = 1L;
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
/**
* Provides clients with an RPC connection to call coprocessor endpoint {@link Service}s
* against a given table region. An instance of this class may be obtained
* by calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])},
* but should normally only be used in creating a new {@link Service} stub to call the endpoint
* methods.
* @see org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])
*/
@InterfaceAudience.Private
public class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel {
private static Log LOG = LogFactory.getLog(CoprocessorRpcChannel.class);
private final HConnection connection;
private final byte[] table;
private final byte[] row;
private byte[] lastRegion;
public CoprocessorRpcChannel(HConnection conn, byte[] table, byte[] row) {
this.connection = conn;
this.table = table;
this.row = row;
}
@Override
public void callMethod(Descriptors.MethodDescriptor method,
RpcController controller,
Message request, Message responsePrototype,
RpcCallback<Message> callback) {
Message response = null;
try {
response = callExecService(method, request, responsePrototype);
} catch (IOException ioe) {
LOG.warn("Call failed on IOException", ioe);
ResponseConverter.setControllerException(controller, ioe);
}
if (callback != null) {
callback.run(response);
}
}
@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor method,
RpcController controller,
Message request, Message responsePrototype)
throws ServiceException {
try {
return callExecService(method, request, responsePrototype);
} catch (IOException ioe) {
throw new ServiceException("Error calling method "+method.getFullName(), ioe);
}
}
private Message callExecService(Descriptors.MethodDescriptor method,
Message request, Message responsePrototype)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Call: "+method.getName()+", "+request.toString());
}
if (row == null) {
throw new IllegalArgumentException("Missing row property for remote region location");
}
final ClientProtos.CoprocessorServiceCall call =
ClientProtos.CoprocessorServiceCall.newBuilder()
.setRow(ByteString.copyFrom(row))
.setServiceName(method.getService().getFullName())
.setMethodName(method.getName())
.setRequest(request.toByteString()).build();
ServerCallable<ClientProtos.CoprocessorServiceResponse> callable =
new ServerCallable<ClientProtos.CoprocessorServiceResponse>(connection, table, row) {
public CoprocessorServiceResponse call() throws Exception {
byte[] regionName = location.getRegionInfo().getRegionName();
return ProtobufUtil.execService(server, call, regionName);
}
};
CoprocessorServiceResponse result = callable.withRetries();
Message response = null;
if (result.getValue().hasValue()) {
response = responsePrototype.newBuilderForType()
.mergeFrom(result.getValue().getValue()).build();
} else {
response = responsePrototype.getDefaultInstanceForType();
}
lastRegion = result.getRegion().getValue().toByteArray();
if (LOG.isTraceEnabled()) {
LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
}
return response;
}
public byte[] getLastRegion() {
return lastRegion;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* {@link org.apache.hadoop.hbase.client.ServerCallable} instance).
*/
@InterfaceAudience.Private
@Deprecated
public class ExecRPCInvoker implements InvocationHandler {
// LOG is NOT in hbase subpackage intentionally so that the default HBase
// DEBUG log level does NOT emit RPC-level logging.
@ -84,6 +85,8 @@ public class ExecRPCInvoker implements InvocationHandler {
LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
", value="+result.getValue());
return result.getValue();
} else if (LOG.isDebugEnabled()) {
LOG.debug("Null row passed for call");
}
return null;

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
/**
* Used for server-side protobuf RPC service invocations. This handler allows
* invocation exceptions to easily be passed through to the RPC server from coprocessor
* {@link Service} implementations.
*
* <p>
* When implementing {@link Service} defined methods, coprocessor endpoints can use the following
* pattern to pass exceptions back to the RPC client:
* <code>
* public void myMethod(RpcController controller, MyRequest request, RpcCallback<MyResponse> done) {
* MyResponse response = null;
* try {
* // do processing
* response = MyResponse.getDefaultInstance(); // or use a new builder to populate the response
* } catch (IOException ioe) {
* // pass exception back up
* ResponseConverter.setControllerException(controller, ioe);
* }
* done.run(response);
* }
* </code>
* </p>
*/
public class ServerRpcController implements RpcController {
/**
* The exception thrown within
* {@link Service#callMethod(Descriptors.MethodDescriptor, RpcController, Message, RpcCallback)},
* if any.
*/
// TODO: it would be good widen this to just Throwable, but IOException is what we allow now
private IOException serviceException;
private String errorMessage;
@Override
public void reset() {
serviceException = null;
errorMessage = null;
}
@Override
public boolean failed() {
return (failedOnException() || errorMessage != null);
}
@Override
public String errorText() {
return errorMessage;
}
@Override
public void startCancel() {
// not implemented
}
@Override
public void setFailed(String message) {
errorMessage = message;
}
@Override
public boolean isCanceled() {
return false;
}
@Override
public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) {
// not implemented
}
/**
* Sets an exception to be communicated back to the {@link Service} client.
* @param ioe the exception encountered during execution of the service method
*/
public void setFailedOn(IOException ioe) {
serviceException = ioe;
setFailed(StringUtils.stringifyException(ioe));
}
/**
* Returns any exception thrown during service method invocation, or {@code null} if no exception
* was thrown. This can be used by clients to receive exceptions generated by RPC calls, even
* when {@link RpcCallback}s are used and no {@link com.google.protobuf.ServiceException} is
* declared.
*/
public IOException getFailedOn() {
return serviceException;
}
/**
* Returns whether or not a server exception was generated in the prior RPC invocation.
*/
public boolean failedOnException() {
return serviceException != null;
}
}

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@ -93,6 +94,9 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
@ -120,6 +124,7 @@ import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.access.TablePermission;
import org.apache.hadoop.hbase.security.access.UserPermission;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.collect.ArrayListMultimap;
@ -127,8 +132,12 @@ import com.google.common.collect.ListMultimap;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.*;
/**
* Protobufs utility.
*/
@ -1306,6 +1315,26 @@ public final class ProtobufUtil {
}
}
public static CoprocessorServiceResponse execService(final ClientProtocol client,
final CoprocessorServiceCall call, final byte[] regionName) throws IOException {
CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder()
.setCall(call).setRegion(
RequestConverter.buildRegionSpecifier(REGION_NAME, regionName)).build();
try {
CoprocessorServiceResponse response =
client.execService(null, request);
return response;
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
public static <T extends Service> T newServiceStub(Class<T> service, RpcChannel channel)
throws Exception {
return (T)Methods.call(service, null, "newStub",
new Class[]{ RpcChannel.class }, new Object[]{ channel });
}
// End helpers for Client
// Start helpers for Admin
@ -1609,7 +1638,7 @@ public final class ProtobufUtil {
/**
* Convert a client Permission to a Permission proto
*
* @param action the client Permission
* @param perm the client Permission
* @return the protobuf Permission
*/
public static AccessControlProtos.Permission toPermission(Permission perm) {
@ -1650,7 +1679,7 @@ public final class ProtobufUtil {
/**
* Converts a Permission.Action proto to a client Permission.Action object.
*
* @param proto the protobuf Action
* @param action the protobuf Action
* @return the converted Action
*/
public static Permission.Action toPermissionAction(
@ -1789,4 +1818,21 @@ public final class ProtobufUtil {
return perms;
}
/**
* Unwraps an exception from a protobuf service into the underlying (expected) IOException.
* This method will <strong>always</strong> throw an exception.
* @param se the {@code ServiceException} instance to convert into an {@code IOException}
*/
public static void toIOException(ServiceException se) throws IOException {
if (se == null) {
throw new NullPointerException("Null service exception passed!");
}
Throwable cause = se.getCause();
if (cause != null && cause instanceof IOException) {
throw (IOException)cause;
}
throw new IOException(se);
}
}

View File

@ -21,10 +21,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.google.protobuf.RpcController;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
@ -40,10 +43,13 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanR
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.security.access.UserPermission;
import org.apache.hadoop.util.StringUtils;
import com.google.protobuf.ByteString;
import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.UserPermissionsResponse;
/**
* Helper utility to build protocol buffer responses,
* or retrieve data from protocol buffer responses.
@ -117,6 +123,18 @@ public final class ResponseConverter {
return builder.build();
}
/**
* Converts the permissions list into a protocol buffer UserPermissionsResponse
*/
public static UserPermissionsResponse buildUserPermissionsResponse(
final List<UserPermission> permissions) {
UserPermissionsResponse.Builder builder = UserPermissionsResponse.newBuilder();
for (UserPermission perm : permissions) {
builder.addPermission(ProtobufUtil.toUserPermission(perm));
}
return builder.build();
}
// End utilities for Client
// Start utilities for Admin
@ -249,4 +267,19 @@ public final class ResponseConverter {
return GetLastFlushedSequenceIdResponse.newBuilder().setLastFlushedSequenceId(seqId).build();
}
/**
* Stores an exception encountered during RPC invocation so it can be passed back
* through to the client.
* @param controller the controller instance provided by the client when calling the service
* @param ioe the exception encountered
*/
public static void setControllerException(RpcController controller, IOException ioe) {
if (controller != null) {
if (controller instanceof ServerRpcController) {
((ServerRpcController)controller).setFailedOn(ioe);
} else {
controller.setFailed(StringUtils.stringifyException(ioe));
}
}
}
}

View File

@ -61,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.protobuf.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -115,6 +116,7 @@ import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.metrics.OperationMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
@ -143,6 +145,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MutableClassToInstanceMap;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
/**
* HRegion stores data for a certain region of a table. It stores all columns
* for each row. A given table consists of one or more HRegions.
@ -215,6 +219,9 @@ public class HRegion implements HeapSize { // , Writable{
private Map<String, Class<? extends CoprocessorProtocol>>
protocolHandlerNames = Maps.newHashMap();
// TODO: account for each registered handler in HeapSize computation
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
/**
* Temporary subdirectory of the region directory used for compaction output.
*/
@ -5027,7 +5034,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
36 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
37 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
(7 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);
@ -5085,6 +5092,7 @@ public class HRegion implements HeapSize { // , Writable{
* @return {@code true} if the registration was successful, {@code false}
* otherwise
*/
@Deprecated
public <T extends CoprocessorProtocol> boolean registerProtocol(
Class<T> protocol, T handler) {
@ -5108,6 +5116,41 @@ public class HRegion implements HeapSize { // , Writable{
return true;
}
/**
* Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
* be available for handling
* {@link HRegion#execService(com.google.protobuf.RpcController, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
*
* <p>
* Only a single instance may be registered per region for a given {@link Service} subclass (the
* instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
* After the first registration, subsequent calls with the same service name will fail with
* a return value of {@code false}.
* </p>
* @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
* @return {@code true} if the registration was successful, {@code false}
* otherwise
*/
public boolean registerService(Service instance) {
/*
* No stacking of instances is allowed for a single service name
*/
Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
LOG.error("Coprocessor service "+serviceDesc.getFullName()+
" already registered, rejecting request from "+instance
);
return false;
}
coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
if (LOG.isDebugEnabled()) {
LOG.debug("Registered coprocessor service: region="+
Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
}
return true;
}
/**
* Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
* method using the registered protocol handlers.
@ -5123,6 +5166,7 @@ public class HRegion implements HeapSize { // , Writable{
* occurs during the invocation
* @see org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)
*/
@Deprecated
public ExecResult exec(Exec call)
throws IOException {
Class<? extends CoprocessorProtocol> protocol = call.getProtocol();
@ -5174,6 +5218,55 @@ public class HRegion implements HeapSize { // , Writable{
return new ExecResult(getRegionName(), value);
}
/**
* Executes a single protocol buffer coprocessor endpoint {@link Service} method using
* the registered protocol handlers. {@link Service} implementations must be registered via the
* {@link org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)}
* method before they are available.
*
* @param controller an {@code RpcContoller} implementation to pass to the invoked service
* @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
* and parameters for the method invocation
* @return a protocol buffer {@code Message} instance containing the method's result
* @throws IOException if no registered service handler is found or an error
* occurs during the invocation
* @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
*/
public Message execService(RpcController controller, CoprocessorServiceCall call)
throws IOException {
String serviceName = call.getServiceName();
String methodName = call.getMethodName();
if (!coprocessorServiceHandlers.containsKey(serviceName)) {
throw new HBaseRPC.UnknownProtocolException(null,
"No registered coprocessor service found for name "+serviceName+
" in region "+Bytes.toStringBinary(getRegionName()));
}
Service service = coprocessorServiceHandlers.get(serviceName);
Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
if (methodDesc == null) {
throw new HBaseRPC.UnknownProtocolException(service.getClass(),
"Unknown method "+methodName+" called on service "+serviceName+
" in region "+Bytes.toStringBinary(getRegionName()));
}
Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
.mergeFrom(call.getRequest()).build();
final Message.Builder responseBuilder =
service.getResponsePrototype(methodDesc).newBuilderForType();
service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
@Override
public void run(Message message) {
if (message != null) {
responseBuilder.mergeFrom(message);
}
}
});
return responseBuilder.build();
}
/*
* Process table.
* Do major compaction or list content.

View File

@ -54,6 +54,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
import com.google.protobuf.Message;
import org.apache.commons.lang.mutable.MutableDouble;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -115,6 +116,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@ -230,6 +232,9 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
@ -3333,6 +3338,31 @@ public class HRegionServer implements ClientProtocol,
}
}
@Override
public CoprocessorServiceResponse execService(final RpcController controller,
final CoprocessorServiceRequest request) throws ServiceException {
try {
requestCount.incrementAndGet();
HRegion region = getRegion(request.getRegion());
// ignore the passed in controller (from the serialized call)
ServerRpcController execController = new ServerRpcController();
Message result = region.execService(execController, request.getCall());
if (execController.getFailedOn() != null) {
throw execController.getFailedOn();
}
CoprocessorServiceResponse.Builder builder =
CoprocessorServiceResponse.newBuilder();
builder.setRegion(RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, region.getRegionName()));
builder.setValue(
builder.getValueBuilder().setName(result.getClass().getName())
.setValue(result.toByteString()));
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Execute multiple actions on a table: get, mutate, and/or execCoprocessor
*

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
@ -216,7 +217,11 @@ public class RegionCoprocessorHost
for (Class c : implClass.getInterfaces()) {
if (CoprocessorProtocol.class.isAssignableFrom(c)) {
region.registerProtocol(c, (CoprocessorProtocol)instance);
break;
}
// we allow endpoints to register as both CoproocessorProtocols and Services
// for ease of transition
if (CoprocessorService.class.isAssignableFrom(c)) {
region.registerService( ((CoprocessorService)instance).getService() );
}
}
ConcurrentMap<String, Object> classData;

View File

@ -28,12 +28,15 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@ -732,6 +735,25 @@ public class RemoteHTable implements HTableInterface {
throw new UnsupportedOperationException("coprocessorExec not implemented");
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
throw new UnsupportedOperationException("coprocessorService not implemented");
}
@Override
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
throws ServiceException, Throwable {
throw new UnsupportedOperationException("coprocessorService not implemented");
}
@Override
public <T extends Service, R> void coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
throws ServiceException, Throwable {
throw new UnsupportedOperationException("coprocessorService not implemented");
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
throw new IOException("atomicMutation not supported");

View File

@ -24,6 +24,9 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -40,18 +43,16 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@ -69,6 +70,8 @@ import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
/**
* Provides basic authorization checks for data access and administrative
* operations.
@ -101,7 +104,8 @@ import com.google.common.collect.Sets;
* </p>
*/
public class AccessController extends BaseRegionObserver
implements MasterObserver, AccessControllerProtocol {
implements MasterObserver, AccessControllerProtocol,
AccessControlService.Interface, CoprocessorService {
/**
* Represents the result of an authorization check for logging and error
* reporting.
@ -1049,6 +1053,7 @@ public class AccessController extends BaseRegionObserver
* These methods are only allowed to be called against the _acl_ region(s).
* This will be restricted by both client side and endpoint implementations.
*/
@Deprecated
@Override
public void grant(UserPermission perm) throws IOException {
// verify it's only running at .acl.
@ -1079,6 +1084,7 @@ public class AccessController extends BaseRegionObserver
permission.getActions()));
}
@Deprecated
@Override
public void revoke(UserPermission perm) throws IOException {
// only allowed to be called on _acl_ region
@ -1109,6 +1115,7 @@ public class AccessController extends BaseRegionObserver
permission.getActions()));
}
@Deprecated
@Override
public List<UserPermission> getUserPermissions(final byte[] tableName) throws IOException {
// only allowed to be called on _acl_ region
@ -1124,6 +1131,7 @@ public class AccessController extends BaseRegionObserver
}
}
@Deprecated
@Override
public void checkPermissions(Permission[] permissions) throws IOException {
byte[] tableName = regionEnv.getRegion().getTableDesc().getName();
@ -1158,11 +1166,13 @@ public class AccessController extends BaseRegionObserver
}
}
@Deprecated
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return PROTOCOL_VERSION;
}
@Deprecated
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
@ -1173,6 +1183,79 @@ public class AccessController extends BaseRegionObserver
"Unexpected protocol requested: "+protocol);
}
/* ---- Protobuf AccessControlService implementation ---- */
@Override
public void grant(RpcController controller,
AccessControlProtos.GrantRequest request,
RpcCallback<AccessControlProtos.GrantResponse> done) {
UserPermission perm = ProtobufUtil.toUserPermission(request.getPermission());
AccessControlProtos.GrantResponse response = null;
try {
grant(perm);
response = AccessControlProtos.GrantResponse.getDefaultInstance();
} catch (IOException ioe) {
// pass exception back up
ResponseConverter.setControllerException(controller, ioe);
}
done.run(response);
}
@Override
public void revoke(RpcController controller,
AccessControlProtos.RevokeRequest request,
RpcCallback<AccessControlProtos.RevokeResponse> done) {
UserPermission perm = ProtobufUtil.toUserPermission(request.getPermission());
AccessControlProtos.RevokeResponse response = null;
try {
revoke(perm);
response = AccessControlProtos.RevokeResponse.getDefaultInstance();
} catch (IOException ioe) {
// pass exception back up
ResponseConverter.setControllerException(controller, ioe);
}
done.run(response);
}
@Override
public void getUserPermissions(RpcController controller,
AccessControlProtos.UserPermissionsRequest request,
RpcCallback<AccessControlProtos.UserPermissionsResponse> done) {
byte[] table = request.getTable().toByteArray();
AccessControlProtos.UserPermissionsResponse response = null;
try {
List<UserPermission> perms = getUserPermissions(table);
response = ResponseConverter.buildUserPermissionsResponse(perms);
} catch (IOException ioe) {
// pass exception back up
ResponseConverter.setControllerException(controller, ioe);
}
done.run(response);
}
@Override
public void checkPermissions(RpcController controller,
AccessControlProtos.CheckPermissionsRequest request,
RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
Permission[] perms = new Permission[request.getPermissionCount()];
for (int i=0; i < request.getPermissionCount(); i++) {
perms[i] = ProtobufUtil.toPermission(request.getPermission(i));
}
AccessControlProtos.CheckPermissionsResponse response = null;
try {
checkPermissions(perms);
response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
}
done.run(response);
}
@Override
public Service getService() {
return AccessControlProtos.AccessControlService.newReflectiveService(this);
}
private byte[] getTableName(RegionCoprocessorEnvironment e) {
HRegion region = e.getRegion();
byte[] tableName = null;

View File

@ -52,3 +52,48 @@ message UserTablePermissions {
repeated UserPermissions permissions = 1;
}
message GrantRequest {
required UserPermission permission = 1;
}
message GrantResponse {
}
message RevokeRequest {
required UserPermission permission = 1;
}
message RevokeResponse {
}
message UserPermissionsRequest {
required bytes table = 1;
}
message UserPermissionsResponse {
repeated UserPermission permission = 1;
}
message CheckPermissionsRequest {
repeated Permission permission = 1;
}
message CheckPermissionsResponse {
}
service AccessControlService {
rpc grant(GrantRequest)
returns (GrantResponse);
rpc revoke(RevokeRequest)
returns (RevokeResponse);
rpc getUserPermissions(UserPermissionsRequest)
returns (UserPermissionsResponse);
rpc checkPermissions(CheckPermissionsRequest)
returns (CheckPermissionsResponse);
}

View File

@ -296,6 +296,23 @@ message ExecCoprocessorResponse {
required NameBytesPair value = 1;
}
message CoprocessorServiceCall {
required bytes row = 1;
required string serviceName = 2;
required string methodName = 3;
required bytes request = 4;
}
message CoprocessorServiceRequest {
required RegionSpecifier region = 1;
required CoprocessorServiceCall call = 2;
}
message CoprocessorServiceResponse {
required RegionSpecifier region = 1;
required NameBytesPair value = 2;
}
/**
* An action that is part of MultiRequest.
* This is a union type - exactly one of the fields will be set.
@ -359,6 +376,9 @@ service ClientService {
rpc execCoprocessor(ExecCoprocessorRequest)
returns(ExecCoprocessorResponse);
rpc execService(CoprocessorServiceRequest)
returns(CoprocessorServiceResponse);
rpc multi(MultiRequest)
returns(MultiResponse);
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hbase.coprocessor.example.generated";
option java_outer_classname = "ExampleProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message CountRequest {
}
message CountResponse {
required int64 count = 1 [default = 0];
}
service RowCountService {
rpc getRowCount(CountRequest)
returns (CountResponse);
rpc getKeyValueCount(CountRequest)
returns (CountResponse);
}

View File

@ -46,13 +46,16 @@ implements ColumnAggregationProtocol {
.getRegion().getScanner(scan);
try {
List<KeyValue> curVals = new ArrayList<KeyValue>();
boolean done = false;
boolean hasMore = false;
do {
curVals.clear();
done = scanner.next(curVals);
KeyValue kv = curVals.get(0);
sumResult += Bytes.toInt(kv.getBuffer(), kv.getValueOffset());
} while (done);
hasMore = scanner.next(curVals);
for (KeyValue kv : curVals) {
if (Bytes.equals(qualifier, kv.getQualifier())) {
sumResult += Bytes.toInt(kv.getBuffer(), kv.getValueOffset());
}
}
} while (hasMore);
} finally {
scanner.close();
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import java.io.IOException;
/**
* Test implementation of a coprocessor endpoint exposing the
* {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by
* unit tests only.
*/
public class ProtobufCoprocessorService
extends TestRpcServiceProtos.TestProtobufRpcProto
implements CoprocessorService, Coprocessor {
public ProtobufCoprocessorService() {
}
@Override
public Service getService() {
return this;
}
@Override
public void ping(RpcController controller, TestProtos.EmptyRequestProto request,
RpcCallback<TestProtos.EmptyResponseProto> done) {
done.run(TestProtos.EmptyResponseProto.getDefaultInstance());
}
@Override
public void echo(RpcController controller, TestProtos.EchoRequestProto request,
RpcCallback<TestProtos.EchoResponseProto> done) {
String message = request.getMessage();
done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build());
}
@Override
public void error(RpcController controller, TestProtos.EmptyRequestProto request,
RpcCallback<TestProtos.EmptyResponseProto> done) {
ResponseConverter.setControllerException(controller, new IOException("Test exception"));
done.run(null);
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
//To change body of implemented methods use File | Settings | File Templates.
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
//To change body of implemented methods use File | Settings | File Templates.
}
}

View File

@ -18,43 +18,40 @@
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import com.google.protobuf.RpcController;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.ByteString;
import static org.junit.Assert.*;
/**
* TestEndpoint: test cases to verify coprocessor Endpoint
*/
@Category(MediumTests.class)
public class TestCoprocessorEndpoint {
private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
@ -76,27 +73,23 @@ public class TestCoprocessorEndpoint {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint",
"org.apache.hadoop.hbase.coprocessor.GenericEndpoint");
org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
org.apache.hadoop.hbase.coprocessor.GenericEndpoint.class.getName(),
ProtobufCoprocessorService.class.getName());
util.startMiniCluster(2);
HTable table = util.createTable(TEST_TABLE, TEST_FAMILY);
util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY,
new byte[][] { HConstants.EMPTY_BYTE_ARRAY,
ROWS[rowSeperator1], ROWS[rowSeperator2] });
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
util.waitUntilAllRegionsAssigned(3);
admin.close();
HTable table = new HTable(conf, TEST_TABLE);
for (int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
put.setWriteToWAL(false);
put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
table.put(put);
}
// sleep here is an ugly hack to allow region transitions to finish
long timeout = System.currentTimeMillis() + (15 * 1000);
while ((System.currentTimeMillis() < timeout) &&
(table.getRegionsInfo().size() != 2)) {
Thread.sleep(250);
}
table.close();
}
@ -135,7 +128,7 @@ public class TestCoprocessorEndpoint {
table.close();
}
@Ignore @Test
@Test
public void testAggregation() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Map<byte[], Long> results;
@ -143,7 +136,7 @@ public class TestCoprocessorEndpoint {
// scan: for all regions
results = table
.coprocessorExec(ColumnAggregationProtocol.class,
ROWS[rowSeperator1 - 1], ROWS[rowSeperator2 + 1],
ROWS[0], ROWS[ROWS.length-1],
new Batch.Call<ColumnAggregationProtocol, Long>() {
public Long call(ColumnAggregationProtocol instance)
throws IOException {
@ -153,19 +146,20 @@ public class TestCoprocessorEndpoint {
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue();
}
for (int i = 0; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", sumResult, expectedResult);
assertEquals("Invalid result", expectedResult, sumResult);
results.clear();
// scan: for region 2 and region 3
results = table
.coprocessorExec(ColumnAggregationProtocol.class,
ROWS[rowSeperator1 + 1], ROWS[rowSeperator2 + 1],
ROWS[rowSeperator1], ROWS[ROWS.length-1],
new Batch.Call<ColumnAggregationProtocol, Long>() {
public Long call(ColumnAggregationProtocol instance)
throws IOException {
@ -175,15 +169,90 @@ public class TestCoprocessorEndpoint {
sumResult = 0;
expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue();
}
for (int i = rowSeperator1; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", sumResult, expectedResult);
assertEquals("Invalid result", expectedResult, sumResult);
table.close();
}
@Test
public void testCoprocessorService() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
final TestProtos.EchoRequestProto request =
TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
final Map<byte[], String> results = Collections.synchronizedMap(
new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
try {
// scan: for all regions
final RpcController controller = new ServerRpcController();
table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
ROWS[0], ROWS[ROWS.length - 1],
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
throws IOException {
LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
instance.echo(controller, request, callback);
TestProtos.EchoResponseProto response = callback.get();
LOG.debug("Batch.Call returning result " + response);
return response;
}
},
new Batch.Callback<TestProtos.EchoResponseProto>() {
public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
assertNotNull(result);
assertEquals("hello", result.getMessage());
results.put(region, result.getMessage());
}
}
);
for (Map.Entry<byte[], String> e : results.entrySet()) {
LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
}
assertEquals(3, results.size());
for (HRegionInfo info : regions.navigableKeySet()) {
LOG.info("Region info is "+info.getRegionNameAsString());
assertTrue(results.containsKey(info.getRegionName()));
}
results.clear();
// scan: for region 2 and region 3
table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
ROWS[rowSeperator1], ROWS[ROWS.length - 1],
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
throws IOException {
LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
instance.echo(controller, request, callback);
TestProtos.EchoResponseProto response = callback.get();
LOG.debug("Batch.Call returning result " + response);
return response;
}
},
new Batch.Callback<TestProtos.EchoResponseProto>() {
public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
assertNotNull(result);
assertEquals("hello", result.getMessage());
results.put(region, result.getMessage());
}
}
);
for (Map.Entry<byte[], String> e : results.entrySet()) {
LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
}
assertEquals(2, results.size());
} finally {
table.close();
}
}
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import static junit.framework.Assert.*;
/**
* Test case demonstrating client interactions with the {@link RowCountEndpoint}
* sample coprocessor Service implementation.
*/
@Category(MediumTests.class)
public class TestRowCountEndpoint {
private static final byte[] TEST_TABLE = Bytes.toBytes("testrowcounter");
private static final byte[] TEST_FAMILY = Bytes.toBytes("f");
private static final byte[] TEST_COLUMN = Bytes.toBytes("col");
private static HBaseTestingUtility TEST_UTIL = null;
private static Configuration CONF = null;
@BeforeClass
public static void setupBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
CONF = TEST_UTIL.getConfiguration();
CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
RowCountEndpoint.class.getName());
TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testEndpoint() throws Throwable {
HTable table = new HTable(CONF, TEST_TABLE);
// insert some test rows
for (int i=0; i<5; i++) {
byte[] iBytes = Bytes.toBytes(i);
Put p = new Put(iBytes);
p.add(TEST_FAMILY, TEST_COLUMN, iBytes);
table.put(p);
}
final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
Map<byte[],Long> results = table.coprocessorService(ExampleProtos.RowCountService.class,
null, null,
new Batch.Call<ExampleProtos.RowCountService,Long>() {
public Long call(ExampleProtos.RowCountService counter) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback =
new BlockingRpcCallback<ExampleProtos.CountResponse>();
counter.getRowCount(controller, request, rpcCallback);
ExampleProtos.CountResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return (response != null && response.hasCount()) ? response.getCount() : 0;
}
});
// should be one region with results
assertEquals(1, results.size());
Iterator<Long> iter = results.values().iterator();
Long val = iter.next();
assertNotNull(val);
assertEquals(5l, val.longValue());
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
@ -412,6 +413,12 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
return null;
}
@Override
public ClientProtos.CoprocessorServiceResponse execService(RpcController controller,
ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
return null;
}
@Override
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse multi(
RpcController controller, MultiRequest request) throws ServiceException {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.security.access;
import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.CheckPermissionsRequest;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -27,6 +29,11 @@ import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -52,6 +59,8 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.security.AccessDeniedException;
@ -128,26 +137,32 @@ public class TestAccessController {
// initilize access control
HTable meta = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
AccessControllerProtocol protocol = meta.coprocessorProxy(AccessControllerProtocol.class,
TEST_TABLE);
BlockingRpcChannel service = meta.coprocessorService(TEST_TABLE);
AccessControlService.BlockingInterface protocol =
AccessControlService.newBlockingStub(service);
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TEST_TABLE).get(0);
RegionCoprocessorHost rcpHost = region.getCoprocessorHost();
RCP_ENV = rcpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
Coprocessor.PRIORITY_HIGHEST, 1, conf);
protocol.grant(new UserPermission(Bytes.toBytes(USER_ADMIN.getShortName()),
Permission.Action.ADMIN, Permission.Action.CREATE, Permission.Action.READ,
Permission.Action.WRITE));
protocol.grant(null, newGrantRequest(USER_ADMIN.getShortName(),
null, null, null,
AccessControlProtos.Permission.Action.ADMIN,
AccessControlProtos.Permission.Action.CREATE,
AccessControlProtos.Permission.Action.READ,
AccessControlProtos.Permission.Action.WRITE));
protocol.grant(new UserPermission(Bytes.toBytes(USER_RW.getShortName()), TEST_TABLE,
TEST_FAMILY, Permission.Action.READ, Permission.Action.WRITE));
protocol.grant(null, newGrantRequest(USER_RW.getShortName(),
TEST_TABLE, TEST_FAMILY, null,
AccessControlProtos.Permission.Action.READ,
AccessControlProtos.Permission.Action.WRITE));
protocol.grant(new UserPermission(Bytes.toBytes(USER_RO.getShortName()), TEST_TABLE,
TEST_FAMILY, Permission.Action.READ));
protocol.grant(null, newGrantRequest(USER_RO.getShortName(), TEST_TABLE,
TEST_FAMILY, null, AccessControlProtos.Permission.Action.READ));
protocol.grant(new UserPermission(Bytes.toBytes(USER_CREATE.getShortName()), TEST_TABLE, null,
Permission.Action.CREATE));
protocol.grant(null, newGrantRequest(USER_CREATE.getShortName(),
TEST_TABLE, null, null, AccessControlProtos.Permission.Action.CREATE));
}
@AfterClass
@ -155,6 +170,32 @@ public class TestAccessController {
TEST_UTIL.shutdownMiniCluster();
}
private static AccessControlProtos.GrantRequest newGrantRequest(
String username, byte[] table, byte[] family, byte[] qualifier,
AccessControlProtos.Permission.Action... actions) {
AccessControlProtos.Permission.Builder permissionBuilder =
AccessControlProtos.Permission.newBuilder();
for (AccessControlProtos.Permission.Action a : actions) {
permissionBuilder.addAction(a);
}
if (table != null) {
permissionBuilder.setTable(ByteString.copyFrom(table));
}
if (family != null) {
permissionBuilder.setFamily(ByteString.copyFrom(family));
}
if (qualifier != null) {
permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
}
return AccessControlProtos.GrantRequest.newBuilder()
.setPermission(
AccessControlProtos.UserPermission.newBuilder()
.setUser(ByteString.copyFromUtf8(username))
.setPermission(permissionBuilder.build())
).build();
}
public void verifyAllowed(User user, PrivilegedExceptionAction... actions) throws Exception {
for (PrivilegedExceptionAction action : actions) {
try {
@ -182,7 +223,13 @@ public class TestAccessController {
// AccessDeniedException
boolean isAccessDeniedException = false;
for (Throwable ex : e.getCauses()) {
if (ex instanceof AccessDeniedException) {
if (ex instanceof ServiceException) {
ServiceException se = (ServiceException)ex;
if (se.getCause() != null && se.getCause() instanceof AccessDeniedException) {
isAccessDeniedException = true;
break;
}
} else if (ex instanceof AccessDeniedException) {
isAccessDeniedException = true;
break;
}
@ -1117,15 +1164,25 @@ public class TestAccessController {
public void checkGlobalPerms(Permission.Action... actions) throws IOException {
HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
new byte[0]);
BlockingRpcChannel channel = acl.coprocessorService(new byte[0]);
AccessControlService.BlockingInterface protocol =
AccessControlService.newBlockingStub(channel);
Permission[] perms = new Permission[actions.length];
for (int i = 0; i < actions.length; i++) {
perms[i] = new Permission(actions[i]);
}
protocol.checkPermissions(perms);
CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
for (Action a : actions) {
request.addPermission(AccessControlProtos.Permission.newBuilder()
.addAction(ProtobufUtil.toPermissionAction(a)).build());
}
try {
protocol.checkPermissions(null, request.build());
} catch (ServiceException se) {
ProtobufUtil.toIOException(se);
}
}
public void checkTablePerms(byte[] table, byte[] family, byte[] column,
@ -1140,22 +1197,39 @@ public class TestAccessController {
public void checkTablePerms(byte[] table, Permission... perms) throws IOException {
HTable acl = new HTable(conf, table);
AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
new byte[0]);
protocol.checkPermissions(perms);
AccessControlService.BlockingInterface protocol =
AccessControlService.newBlockingStub(acl.coprocessorService(new byte[0]));
CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
for (Permission p : perms) {
request.addPermission(ProtobufUtil.toPermission(p));
}
try {
protocol.checkPermissions(null, request.build());
} catch (ServiceException se) {
ProtobufUtil.toIOException(se);
}
}
public void grant(AccessControllerProtocol protocol, User user, byte[] t, byte[] f, byte[] q,
Permission.Action... actions) throws IOException {
protocol.grant(new UserPermission(Bytes.toBytes(user.getShortName()), t, f, q, actions));
public void grant(AccessControlService.BlockingInterface protocol, User user,
byte[] t, byte[] f, byte[] q, Permission.Action... actions)
throws ServiceException {
List<AccessControlProtos.Permission.Action> permActions =
Lists.newArrayListWithCapacity(actions.length);
for (Action a : actions) {
permActions.add(ProtobufUtil.toPermissionAction(a));
}
AccessControlProtos.GrantRequest request =
newGrantRequest(user.getShortName(), t, f, q, permActions.toArray(
new AccessControlProtos.Permission.Action[actions.length]));
protocol.grant(null, request);
}
@Test
public void testCheckPermissions() throws Exception {
final HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
final AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
TEST_TABLE);
BlockingRpcChannel channel = acl.coprocessorService(new byte[0]);
AccessControlService.BlockingInterface protocol =
AccessControlService.newBlockingStub(channel);
// --------------------------------------
// test global permissions
@ -1278,11 +1352,15 @@ public class TestAccessController {
// --------------------------------------
// check for wrong table region
try {
CheckPermissionsRequest checkRequest =
CheckPermissionsRequest.newBuilder().addPermission(
AccessControlProtos.Permission.newBuilder()
.setTable(ByteString.copyFrom(TEST_TABLE)).addAction(AccessControlProtos.Permission.Action.CREATE)
).build();
// but ask for TablePermissions for TEST_TABLE
protocol.checkPermissions(new Permission[] { (Permission) new TablePermission(TEST_TABLE,
null, (byte[]) null, Permission.Action.CREATE) });
protocol.checkPermissions(null, checkRequest);
fail("this should have thrown CoprocessorException");
} catch (CoprocessorException ex) {
} catch (ServiceException ex) {
// expected
}