HBASE-7042 Master Coprocessor Endpoint (Francis Liu)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1409257 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-11-14 17:03:06 +00:00
parent 32f23b3d0e
commit 4c5928faf9
13 changed files with 519 additions and 145 deletions

View File

@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@ -2141,4 +2143,29 @@ public class HBaseAdmin implements Abortable, Closeable {
throw new IOException("Unexpected exception when calling master", e);
}
}
/**
* Creates and returns a {@link com.google.protobuf.RpcChannel} instance
* connected to the active master.
*
* <p>
* The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published
* coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations:
* </p>
*
* <div style="background-color: #cccccc; padding: 2px">
* <blockquote><pre>
* CoprocessorRpcChannel channel = myAdmin.coprocessorService();
* MyService.BlockingInterface service = MyService.newBlockingStub(channel);
* MyCallRequest request = MyCallRequest.newBuilder()
* ...
* .build();
* MyCallResponse response = service.myCall(null, request);
* </pre></blockquote></div>
*
* @return A MasterCoprocessorRpcChannel instance
*/
public CoprocessorRpcChannel coprocessorService() {
return new MasterCoprocessorRpcChannel(connection);
}
}

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.io.DataInputInputStream;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
@ -1343,7 +1344,7 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return new CoprocessorRpcChannel(connection, tableName, row);
return new RegionCoprocessorRpcChannel(connection, tableName, row);
}
/**
@ -1420,8 +1421,8 @@ public class HTable implements HTableInterface {
Map<byte[],Future<R>> futures =
new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
for (final byte[] r : keys) {
final CoprocessorRpcChannel channel =
new CoprocessorRpcChannel(connection, tableName, r);
final RegionCoprocessorRpcChannel channel =
new RegionCoprocessorRpcChannel(connection, tableName, r);
Future<R> future = pool.submit(
new Callable<R>() {
public R call() throws Exception {

View File

@ -22,42 +22,18 @@ import com.google.protobuf.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
/**
* Provides clients with an RPC connection to call coprocessor endpoint {@link Service}s
* against a given table region. An instance of this class may be obtained
* by calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])},
* but should normally only be used in creating a new {@link Service} stub to call the endpoint
* methods.
* @see org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])
* Base class which provides clients with an RPC connection to
* call coprocessor endpoint {@link Service}s
*/
@InterfaceAudience.Private
public class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel {
public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel {
private static Log LOG = LogFactory.getLog(CoprocessorRpcChannel.class);
private final HConnection connection;
private final byte[] table;
private final byte[] row;
private byte[] lastRegion;
public CoprocessorRpcChannel(HConnection conn, byte[] table, byte[] row) {
this.connection = conn;
this.table = table;
this.row = row;
}
@Override
public void callMethod(Descriptors.MethodDescriptor method,
RpcController controller,
@ -87,46 +63,6 @@ public class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel {
}
}
private Message callExecService(Descriptors.MethodDescriptor method,
Message request, Message responsePrototype)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Call: "+method.getName()+", "+request.toString());
}
if (row == null) {
throw new IllegalArgumentException("Missing row property for remote region location");
}
final ClientProtos.CoprocessorServiceCall call =
ClientProtos.CoprocessorServiceCall.newBuilder()
.setRow(ByteString.copyFrom(row))
.setServiceName(method.getService().getFullName())
.setMethodName(method.getName())
.setRequest(request.toByteString()).build();
ServerCallable<ClientProtos.CoprocessorServiceResponse> callable =
new ServerCallable<ClientProtos.CoprocessorServiceResponse>(connection, table, row) {
public CoprocessorServiceResponse call() throws Exception {
byte[] regionName = location.getRegionInfo().getRegionName();
return ProtobufUtil.execService(server, call, regionName);
}
};
CoprocessorServiceResponse result = callable.withRetries();
Message response = null;
if (result.getValue().hasValue()) {
response = responsePrototype.newBuilderForType()
.mergeFrom(result.getValue().getValue()).build();
} else {
response = responsePrototype.getDefaultInstanceForType();
}
lastRegion = result.getRegion().getValue().toByteArray();
if (LOG.isTraceEnabled()) {
LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
}
return response;
}
public byte[] getLastRegion() {
return lastRegion;
}
protected abstract Message callExecService(Descriptors.MethodDescriptor method,
Message request, Message responsePrototype) throws IOException;
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import java.io.IOException;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
/**
* Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s
* against the active master. An instance of this class may be obtained
* by calling {@link org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService()},
* but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to call the endpoint
* methods.
* @see org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService()
*/
@InterfaceAudience.Private
public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{
private static Log LOG = LogFactory.getLog(MasterCoprocessorRpcChannel.class);
private final HConnection connection;
public MasterCoprocessorRpcChannel(HConnection conn) {
this.connection = conn;
}
@Override
protected Message callExecService(Descriptors.MethodDescriptor method,
Message request, Message responsePrototype)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Call: "+method.getName()+", "+request.toString());
}
final ClientProtos.CoprocessorServiceCall call =
ClientProtos.CoprocessorServiceCall.newBuilder()
.setRow(ByteString.copyFrom(HConstants.EMPTY_BYTE_ARRAY))
.setServiceName(method.getService().getFullName())
.setMethodName(method.getName())
.setRequest(request.toByteString()).build();
CoprocessorServiceResponse result = ProtobufUtil.execService(connection.getMasterAdmin(), call);
Message response = null;
if (result.getValue().hasValue()) {
response = responsePrototype.newBuilderForType()
.mergeFrom(result.getValue().getValue()).build();
} else {
response = responsePrototype.getDefaultInstanceForType();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Master Result is value=" + response);
}
return response;
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
/**
* Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s
* against a given table region. An instance of this class may be obtained
* by calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])},
* but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to call the endpoint
* methods.
* @see org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])
*/
@InterfaceAudience.Private
public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
private static Log LOG = LogFactory.getLog(RegionCoprocessorRpcChannel.class);
private final HConnection connection;
private final byte[] table;
private final byte[] row;
private byte[] lastRegion;
public RegionCoprocessorRpcChannel(HConnection conn, byte[] table, byte[] row) {
this.connection = conn;
this.table = table;
this.row = row;
}
@Override
protected Message callExecService(Descriptors.MethodDescriptor method,
Message request, Message responsePrototype)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Call: "+method.getName()+", "+request.toString());
}
if (row == null) {
throw new IllegalArgumentException("Missing row property for remote region location");
}
final ClientProtos.CoprocessorServiceCall call =
ClientProtos.CoprocessorServiceCall.newBuilder()
.setRow(ByteString.copyFrom(row))
.setServiceName(method.getService().getFullName())
.setMethodName(method.getName())
.setRequest(request.toByteString()).build();
ServerCallable<CoprocessorServiceResponse> callable =
new ServerCallable<CoprocessorServiceResponse>(connection, table, row) {
public CoprocessorServiceResponse call() throws Exception {
byte[] regionName = location.getRegionInfo().getRegionName();
return ProtobufUtil.execService(server, call, regionName);
}
};
CoprocessorServiceResponse result = callable.withRetries();
Message response = null;
if (result.getValue().hasValue()) {
response = responsePrototype.newBuilderForType()
.mergeFrom(result.getValue().getValue()).build();
} else {
response = responsePrototype.getDefaultInstanceForType();
}
lastRegion = result.getRegion().getValue().toByteArray();
if (LOG.isTraceEnabled()) {
LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
}
return response;
}
public byte[] getLastRegion() {
return lastRegion;
}
}

View File

@ -40,6 +40,11 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.management.ObjectName;
import com.google.common.collect.Maps;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -82,6 +87,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
@ -101,7 +107,9 @@ import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@ -159,6 +167,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@ -309,6 +318,8 @@ Server {
private SpanReceiverHost spanReceiverHost;
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
/**
* Initializes the HMaster. The steps are as follows:
* <p>
@ -2274,6 +2285,79 @@ Server {
return OfflineRegionResponse.newBuilder().build();
}
@Override
public boolean registerService(Service instance) {
/*
* No stacking of instances is allowed for a single service name
*/
Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
LOG.error("Coprocessor service "+serviceDesc.getFullName()+
" already registered, rejecting request from "+instance
);
return false;
}
coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
if (LOG.isDebugEnabled()) {
LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
}
return true;
}
@Override
public ClientProtos.CoprocessorServiceResponse execMasterService(final RpcController controller,
final ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
try {
ServerRpcController execController = new ServerRpcController();
ClientProtos.CoprocessorServiceCall call = request.getCall();
String serviceName = call.getServiceName();
String methodName = call.getMethodName();
if (!coprocessorServiceHandlers.containsKey(serviceName)) {
throw new HBaseRPC.UnknownProtocolException(null,
"No registered master coprocessor service found for name "+serviceName);
}
Service service = coprocessorServiceHandlers.get(serviceName);
Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
if (methodDesc == null) {
throw new HBaseRPC.UnknownProtocolException(service.getClass(),
"Unknown method "+methodName+" called on master service "+serviceName);
}
//invoke the method
Message execRequest = service.getRequestPrototype(methodDesc).newBuilderForType()
.mergeFrom(call.getRequest()).build();
final Message.Builder responseBuilder =
service.getResponsePrototype(methodDesc).newBuilderForType();
service.callMethod(methodDesc, controller, execRequest, new RpcCallback<Message>() {
@Override
public void run(Message message) {
if (message != null) {
responseBuilder.mergeFrom(message);
}
}
});
Message execResult = responseBuilder.build();
if (execController.getFailedOn() != null) {
throw execController.getFailedOn();
}
ClientProtos.CoprocessorServiceResponse.Builder builder =
ClientProtos.CoprocessorServiceResponse.newBuilder();
builder.setRegion(RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, HConstants.EMPTY_BYTE_ARRAY));
builder.setValue(
builder.getValueBuilder().setName(execResult.getClass().getName())
.setValue(execResult.toByteString()));
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Utility for constructing an instance of the passed HMaster class.
* @param masterClass

View File

@ -70,6 +70,11 @@ public class MasterCoprocessorHost
public MasterEnvironment createEnvironment(final Class<?> implClass,
final Coprocessor instance, final int priority, final int seq,
final Configuration conf) {
for (Class c : implClass.getInterfaces()) {
if (CoprocessorService.class.isAssignableFrom(c)) {
masterServices.registerService(((CoprocessorService)instance).getService());
}
}
return new MasterEnvironment(implClass, instance, priority, seq, conf,
masterServices);
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import com.google.protobuf.Service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
@ -77,4 +78,23 @@ public interface MasterServices extends Server {
* @return true if master enables ServerShutdownHandler;
*/
public boolean isServerShutdownHandlerEnabled();
/**
* Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint to
* be available for handling
* {@link org.apache.hadoop.hbase.MasterAdminProtocol#execMasterService(com.google.protobuf.RpcController,
* org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)} calls.
*
* <p>
* Only a single instance may be registered for a given {@link Service} subclass (the
* instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
* After the first registration, subsequent calls with the same service name will fail with
* a return value of {@code false}.
* </p>
* @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
* @return {@code true} if the registration was successful, {@code false}
* otherwise
*/
public boolean registerService(Service instance);
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterAdminProtocol;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.AdminProtocol;
@ -1347,6 +1348,20 @@ public final class ProtobufUtil {
}
}
public static CoprocessorServiceResponse execService(final MasterAdminProtocol client,
final CoprocessorServiceCall call) throws IOException {
CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder()
.setCall(call).setRegion(
RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build();
try {
CoprocessorServiceResponse response =
client.execMasterService(null, request);
return response;
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
@SuppressWarnings("unchecked")
public static <T extends Service> T newServiceStub(Class<T> service, RpcChannel channel)
throws Exception {

View File

@ -14464,6 +14464,11 @@ public final class MasterAdminProtos {
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse> done);
public abstract void execMasterService(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done);
}
public static com.google.protobuf.Service newReflectiveService(
@ -14621,6 +14626,14 @@ public final class MasterAdminProtos {
impl.isCatalogJanitorEnabled(controller, request, done);
}
@java.lang.Override
public void execMasterService(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done) {
impl.execMasterService(controller, request, done);
}
};
}
@ -14681,6 +14694,8 @@ public final class MasterAdminProtos {
return impl.enableCatalogJanitor(controller, (org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest)request);
case 18:
return impl.isCatalogJanitorEnabled(controller, (org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest)request);
case 19:
return impl.execMasterService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request);
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -14733,6 +14748,8 @@ public final class MasterAdminProtos {
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest.getDefaultInstance();
case 18:
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest.getDefaultInstance();
case 19:
return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -14785,6 +14802,8 @@ public final class MasterAdminProtos {
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse.getDefaultInstance();
case 18:
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance();
case 19:
return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -14888,6 +14907,11 @@ public final class MasterAdminProtos {
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse> done);
public abstract void execMasterService(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done);
public static final
com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptor() {
@ -15005,6 +15029,11 @@ public final class MasterAdminProtos {
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse>specializeCallback(
done));
return;
case 19:
this.execMasterService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request,
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse>specializeCallback(
done));
return;
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -15057,6 +15086,8 @@ public final class MasterAdminProtos {
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest.getDefaultInstance();
case 18:
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest.getDefaultInstance();
case 19:
return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -15109,6 +15140,8 @@ public final class MasterAdminProtos {
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse.getDefaultInstance();
case 18:
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance();
case 19:
return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -15414,6 +15447,21 @@ public final class MasterAdminProtos {
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.class,
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance()));
}
public void execMasterService(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse> done) {
channel.callMethod(
getDescriptor().getMethods().get(19),
controller,
request,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance(),
com.google.protobuf.RpcUtil.generalizeCallback(
done,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.class,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance()));
}
}
public static BlockingInterface newBlockingStub(
@ -15516,6 +15564,11 @@ public final class MasterAdminProtos {
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest request)
throws com.google.protobuf.ServiceException;
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse execMasterService(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request)
throws com.google.protobuf.ServiceException;
}
private static final class BlockingStub implements BlockingInterface {
@ -15752,6 +15805,18 @@ public final class MasterAdminProtos {
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance());
}
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse execMasterService(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request)
throws com.google.protobuf.ServiceException {
return (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse) channel.callBlockingMethod(
getDescriptor().getMethods().get(19),
controller,
request,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance());
}
}
}
@ -15954,79 +16019,81 @@ public final class MasterAdminProtos {
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\021MasterAdmin.proto\032\013hbase.proto\"R\n\020AddC" +
"olumnRequest\022\021\n\ttableName\030\001 \002(\014\022+\n\016colum" +
"nFamilies\030\002 \002(\0132\023.ColumnFamilySchema\"\023\n\021" +
"AddColumnResponse\"<\n\023DeleteColumnRequest" +
"\022\021\n\ttableName\030\001 \002(\014\022\022\n\ncolumnName\030\002 \002(\014\"" +
"\026\n\024DeleteColumnResponse\"U\n\023ModifyColumnR" +
"equest\022\021\n\ttableName\030\001 \002(\014\022+\n\016columnFamil" +
"ies\030\002 \002(\0132\023.ColumnFamilySchema\"\026\n\024Modify" +
"ColumnResponse\"Z\n\021MoveRegionRequest\022 \n\006r" +
"egion\030\001 \002(\0132\020.RegionSpecifier\022#\n\016destSer",
"verName\030\002 \001(\0132\013.ServerName\"\024\n\022MoveRegion" +
"Response\"7\n\023AssignRegionRequest\022 \n\006regio" +
"n\030\001 \002(\0132\020.RegionSpecifier\"\026\n\024AssignRegio" +
"nResponse\"O\n\025UnassignRegionRequest\022 \n\006re" +
"gion\030\001 \002(\0132\020.RegionSpecifier\022\024\n\005force\030\002 " +
"\001(\010:\005false\"\030\n\026UnassignRegionResponse\"8\n\024" +
"OfflineRegionRequest\022 \n\006region\030\001 \002(\0132\020.R" +
"egionSpecifier\"\027\n\025OfflineRegionResponse\"" +
"J\n\022CreateTableRequest\022!\n\013tableSchema\030\001 \002" +
"(\0132\014.TableSchema\022\021\n\tsplitKeys\030\002 \003(\014\"\025\n\023C",
"reateTableResponse\"\'\n\022DeleteTableRequest" +
"\022\021\n\ttableName\030\001 \002(\014\"\025\n\023DeleteTableRespon" +
"se\"\'\n\022EnableTableRequest\022\021\n\ttableName\030\001 " +
"\002(\014\"\025\n\023EnableTableResponse\"(\n\023DisableTab" +
"leRequest\022\021\n\ttableName\030\001 \002(\014\"\026\n\024DisableT" +
"ableResponse\"J\n\022ModifyTableRequest\022\021\n\tta" +
"bleName\030\001 \002(\014\022!\n\013tableSchema\030\002 \002(\0132\014.Tab" +
"leSchema\"\025\n\023ModifyTableResponse\"\021\n\017Shutd" +
"ownRequest\"\022\n\020ShutdownResponse\"\023\n\021StopMa" +
"sterRequest\"\024\n\022StopMasterResponse\"\020\n\016Bal",
"anceRequest\"&\n\017BalanceResponse\022\023\n\013balanc" +
"erRan\030\001 \002(\010\"<\n\031SetBalancerRunningRequest" +
"\022\n\n\002on\030\001 \002(\010\022\023\n\013synchronous\030\002 \001(\010\"6\n\032Set" +
"BalancerRunningResponse\022\030\n\020prevBalanceVa" +
"lue\030\001 \001(\010\"\024\n\022CatalogScanRequest\")\n\023Catal" +
"ogScanResponse\022\022\n\nscanResult\030\001 \001(\005\"-\n\033En" +
"ableCatalogJanitorRequest\022\016\n\006enable\030\001 \002(" +
"\010\"1\n\034EnableCatalogJanitorResponse\022\021\n\tpre" +
"vValue\030\001 \001(\010\" \n\036IsCatalogJanitorEnabledR" +
"equest\"0\n\037IsCatalogJanitorEnabledRespons",
"e\022\r\n\005value\030\001 \002(\0102\263\t\n\022MasterAdminService\022" +
"2\n\taddColumn\022\021.AddColumnRequest\032\022.AddCol" +
"umnResponse\022;\n\014deleteColumn\022\024.DeleteColu" +
"mnRequest\032\025.DeleteColumnResponse\022;\n\014modi" +
"fyColumn\022\024.ModifyColumnRequest\032\025.ModifyC" +
"olumnResponse\0225\n\nmoveRegion\022\022.MoveRegion" +
"Request\032\023.MoveRegionResponse\022;\n\014assignRe" +
"gion\022\024.AssignRegionRequest\032\025.AssignRegio" +
"nResponse\022A\n\016unassignRegion\022\026.UnassignRe" +
"gionRequest\032\027.UnassignRegionResponse\022>\n\r",
"offlineRegion\022\025.OfflineRegionRequest\032\026.O" +
"fflineRegionResponse\0228\n\013deleteTable\022\023.De" +
"leteTableRequest\032\024.DeleteTableResponse\0228" +
"\n\013enableTable\022\023.EnableTableRequest\032\024.Ena" +
"bleTableResponse\022;\n\014disableTable\022\024.Disab" +
"leTableRequest\032\025.DisableTableResponse\0228\n" +
"\013modifyTable\022\023.ModifyTableRequest\032\024.Modi" +
"fyTableResponse\0228\n\013createTable\022\023.CreateT" +
"ableRequest\032\024.CreateTableResponse\022/\n\010shu" +
"tdown\022\020.ShutdownRequest\032\021.ShutdownRespon",
"se\0225\n\nstopMaster\022\022.StopMasterRequest\032\023.S" +
"topMasterResponse\022,\n\007balance\022\017.BalanceRe" +
"quest\032\020.BalanceResponse\022M\n\022setBalancerRu" +
"nning\022\032.SetBalancerRunningRequest\032\033.SetB" +
"alancerRunningResponse\022;\n\016runCatalogScan" +
"\022\023.CatalogScanRequest\032\024.CatalogScanRespo" +
"nse\022S\n\024enableCatalogJanitor\022\034.EnableCata" +
"logJanitorRequest\032\035.EnableCatalogJanitor" +
"Response\022\\\n\027isCatalogJanitorEnabled\022\037.Is" +
"CatalogJanitorEnabledRequest\032 .IsCatalog",
"JanitorEnabledResponseBG\n*org.apache.had" +
"oop.hbase.protobuf.generatedB\021MasterAdmi" +
"nProtosH\001\210\001\001\240\001\001"
"\n\021MasterAdmin.proto\032\013hbase.proto\032\014Client" +
".proto\"R\n\020AddColumnRequest\022\021\n\ttableName\030" +
"\001 \002(\014\022+\n\016columnFamilies\030\002 \002(\0132\023.ColumnFa" +
"milySchema\"\023\n\021AddColumnResponse\"<\n\023Delet" +
"eColumnRequest\022\021\n\ttableName\030\001 \002(\014\022\022\n\ncol" +
"umnName\030\002 \002(\014\"\026\n\024DeleteColumnResponse\"U\n" +
"\023ModifyColumnRequest\022\021\n\ttableName\030\001 \002(\014\022" +
"+\n\016columnFamilies\030\002 \002(\0132\023.ColumnFamilySc" +
"hema\"\026\n\024ModifyColumnResponse\"Z\n\021MoveRegi" +
"onRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif",
"ier\022#\n\016destServerName\030\002 \001(\0132\013.ServerName" +
"\"\024\n\022MoveRegionResponse\"7\n\023AssignRegionRe" +
"quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\"" +
"\026\n\024AssignRegionResponse\"O\n\025UnassignRegio" +
"nRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi" +
"er\022\024\n\005force\030\002 \001(\010:\005false\"\030\n\026UnassignRegi" +
"onResponse\"8\n\024OfflineRegionRequest\022 \n\006re" +
"gion\030\001 \002(\0132\020.RegionSpecifier\"\027\n\025OfflineR" +
"egionResponse\"J\n\022CreateTableRequest\022!\n\013t" +
"ableSchema\030\001 \002(\0132\014.TableSchema\022\021\n\tsplitK",
"eys\030\002 \003(\014\"\025\n\023CreateTableResponse\"\'\n\022Dele" +
"teTableRequest\022\021\n\ttableName\030\001 \002(\014\"\025\n\023Del" +
"eteTableResponse\"\'\n\022EnableTableRequest\022\021" +
"\n\ttableName\030\001 \002(\014\"\025\n\023EnableTableResponse" +
"\"(\n\023DisableTableRequest\022\021\n\ttableName\030\001 \002" +
"(\014\"\026\n\024DisableTableResponse\"J\n\022ModifyTabl" +
"eRequest\022\021\n\ttableName\030\001 \002(\014\022!\n\013tableSche" +
"ma\030\002 \002(\0132\014.TableSchema\"\025\n\023ModifyTableRes" +
"ponse\"\021\n\017ShutdownRequest\"\022\n\020ShutdownResp" +
"onse\"\023\n\021StopMasterRequest\"\024\n\022StopMasterR",
"esponse\"\020\n\016BalanceRequest\"&\n\017BalanceResp" +
"onse\022\023\n\013balancerRan\030\001 \002(\010\"<\n\031SetBalancer" +
"RunningRequest\022\n\n\002on\030\001 \002(\010\022\023\n\013synchronou" +
"s\030\002 \001(\010\"6\n\032SetBalancerRunningResponse\022\030\n" +
"\020prevBalanceValue\030\001 \001(\010\"\024\n\022CatalogScanRe" +
"quest\")\n\023CatalogScanResponse\022\022\n\nscanResu" +
"lt\030\001 \001(\005\"-\n\033EnableCatalogJanitorRequest\022" +
"\016\n\006enable\030\001 \002(\010\"1\n\034EnableCatalogJanitorR" +
"esponse\022\021\n\tprevValue\030\001 \001(\010\" \n\036IsCatalogJ" +
"anitorEnabledRequest\"0\n\037IsCatalogJanitor",
"EnabledResponse\022\r\n\005value\030\001 \002(\0102\201\n\n\022Maste" +
"rAdminService\0222\n\taddColumn\022\021.AddColumnRe" +
"quest\032\022.AddColumnResponse\022;\n\014deleteColum" +
"n\022\024.DeleteColumnRequest\032\025.DeleteColumnRe" +
"sponse\022;\n\014modifyColumn\022\024.ModifyColumnReq" +
"uest\032\025.ModifyColumnResponse\0225\n\nmoveRegio" +
"n\022\022.MoveRegionRequest\032\023.MoveRegionRespon" +
"se\022;\n\014assignRegion\022\024.AssignRegionRequest" +
"\032\025.AssignRegionResponse\022A\n\016unassignRegio" +
"n\022\026.UnassignRegionRequest\032\027.UnassignRegi",
"onResponse\022>\n\rofflineRegion\022\025.OfflineReg" +
"ionRequest\032\026.OfflineRegionResponse\0228\n\013de" +
"leteTable\022\023.DeleteTableRequest\032\024.DeleteT" +
"ableResponse\0228\n\013enableTable\022\023.EnableTabl" +
"eRequest\032\024.EnableTableResponse\022;\n\014disabl" +
"eTable\022\024.DisableTableRequest\032\025.DisableTa" +
"bleResponse\0228\n\013modifyTable\022\023.ModifyTable" +
"Request\032\024.ModifyTableResponse\0228\n\013createT" +
"able\022\023.CreateTableRequest\032\024.CreateTableR" +
"esponse\022/\n\010shutdown\022\020.ShutdownRequest\032\021.",
"ShutdownResponse\0225\n\nstopMaster\022\022.StopMas" +
"terRequest\032\023.StopMasterResponse\022,\n\007balan" +
"ce\022\017.BalanceRequest\032\020.BalanceResponse\022M\n" +
"\022setBalancerRunning\022\032.SetBalancerRunning" +
"Request\032\033.SetBalancerRunningResponse\022;\n\016" +
"runCatalogScan\022\023.CatalogScanRequest\032\024.Ca" +
"talogScanResponse\022S\n\024enableCatalogJanito" +
"r\022\034.EnableCatalogJanitorRequest\032\035.Enable" +
"CatalogJanitorResponse\022\\\n\027isCatalogJanit" +
"orEnabled\022\037.IsCatalogJanitorEnabledReque",
"st\032 .IsCatalogJanitorEnabledResponse\022L\n\021" +
"execMasterService\022\032.CoprocessorServiceRe" +
"quest\032\033.CoprocessorServiceResponseBG\n*or" +
"g.apache.hadoop.hbase.protobuf.generated" +
"B\021MasterAdminProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -16344,6 +16411,7 @@ public final class MasterAdminProtos {
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.getDescriptor(),
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.getDescriptor(),
}, assigner);
}

View File

@ -25,6 +25,7 @@ option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "hbase.proto";
import "Client.proto";
/* Column-level protobufs */
@ -273,4 +274,10 @@ service MasterAdminService {
*/
rpc isCatalogJanitorEnabled(IsCatalogJanitorEnabledRequest)
returns(IsCatalogJanitorEnabledResponse);
/**
* Call a master coprocessor endpoint
*/
rpc execMasterService(CoprocessorServiceRequest)
returns(CoprocessorServiceResponse);
}

View File

@ -45,6 +45,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
/**
* TestEndpoint: test cases to verify coprocessor Endpoint
@ -76,6 +77,8 @@ public class TestCoprocessorEndpoint {
org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
org.apache.hadoop.hbase.coprocessor.GenericEndpoint.class.getName(),
ProtobufCoprocessorService.class.getName());
conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
ProtobufCoprocessorService.class.getName());
util.startMiniCluster(2);
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
@ -253,6 +256,17 @@ public class TestCoprocessorEndpoint {
}
}
@Test
public void testMasterCoprocessorService() throws Throwable {
HBaseAdmin admin = util.getHBaseAdmin();
final TestProtos.EchoRequestProto request =
TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
assertEquals("hello", service.echo(null, request).getMessage());
admin.close();
}
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {

View File

@ -30,6 +30,7 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -288,6 +289,11 @@ public class TestCatalogJanitor {
public boolean isServerShutdownHandlerEnabled() {
return true;
}
@Override
public boolean registerService(Service instance) {
return false;
}
}
@Test