From 4c5928faf9fb85ddea248adfda5f469cf55e1b0a Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Wed, 14 Nov 2012 17:03:06 +0000 Subject: [PATCH] HBASE-7042 Master Coprocessor Endpoint (Francis Liu) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1409257 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/client/HBaseAdmin.java | 27 +++ .../apache/hadoop/hbase/client/HTable.java | 7 +- .../hbase/ipc/CoprocessorRpcChannel.java | 74 +----- .../ipc/MasterCoprocessorRpcChannel.java | 88 +++++++ .../ipc/RegionCoprocessorRpcChannel.java | 103 +++++++++ .../apache/hadoop/hbase/master/HMaster.java | 84 +++++++ .../hbase/master/MasterCoprocessorHost.java | 5 + .../hadoop/hbase/master/MasterServices.java | 20 ++ .../hadoop/hbase/protobuf/ProtobufUtil.java | 15 ++ .../protobuf/generated/MasterAdminProtos.java | 214 ++++++++++++------ .../src/main/protobuf/MasterAdmin.proto | 7 + .../coprocessor/TestCoprocessorEndpoint.java | 14 ++ .../hbase/master/TestCatalogJanitor.java | 6 + 13 files changed, 519 insertions(+), 145 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 9e1b0b84e42..cdb72a15227 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; @@ -2141,4 +2143,29 @@ public class HBaseAdmin implements Abortable, Closeable { throw new IOException("Unexpected exception when calling master", e); } } + + /** + * Creates and returns a {@link com.google.protobuf.RpcChannel} instance + * connected to the active master. + * + *

+ * The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published + * coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations: + *

+ * + *
+ *
+   * CoprocessorRpcChannel channel = myAdmin.coprocessorService();
+   * MyService.BlockingInterface service = MyService.newBlockingStub(channel);
+   * MyCallRequest request = MyCallRequest.newBuilder()
+   *     ...
+   *     .build();
+   * MyCallResponse response = service.myCall(null, request);
+   * 
+ * + * @return A MasterCoprocessorRpcChannel instance + */ + public CoprocessorRpcChannel coprocessorService() { + return new MasterCoprocessorRpcChannel(connection); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java index b7b15065b33..2a29eab954a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.io.DataInputInputStream; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.ExecRPCInvoker; +import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; @@ -1343,7 +1344,7 @@ public class HTable implements HTableInterface { * {@inheritDoc} */ public CoprocessorRpcChannel coprocessorService(byte[] row) { - return new CoprocessorRpcChannel(connection, tableName, row); + return new RegionCoprocessorRpcChannel(connection, tableName, row); } /** @@ -1420,8 +1421,8 @@ public class HTable implements HTableInterface { Map> futures = new TreeMap>(Bytes.BYTES_COMPARATOR); for (final byte[] r : keys) { - final CoprocessorRpcChannel channel = - new CoprocessorRpcChannel(connection, tableName, r); + final RegionCoprocessorRpcChannel channel = + new RegionCoprocessorRpcChannel(connection, tableName, r); Future future = pool.submit( new Callable() { public R call() throws Exception { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java index 92e7ef35ef3..d3d92e2c9b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java @@ -22,42 +22,18 @@ import com.google.protobuf.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.ServerCallable; -import org.apache.hadoop.hbase.client.coprocessor.Exec; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; -import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; - /** - * Provides clients with an RPC connection to call coprocessor endpoint {@link Service}s - * against a given table region. An instance of this class may be obtained - * by calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])}, - * but should normally only be used in creating a new {@link Service} stub to call the endpoint - * methods. - * @see org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[]) + * Base class which provides clients with an RPC connection to + * call coprocessor endpoint {@link Service}s */ @InterfaceAudience.Private -public class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel { +public abstract class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel { private static Log LOG = LogFactory.getLog(CoprocessorRpcChannel.class); - private final HConnection connection; - private final byte[] table; - private final byte[] row; - private byte[] lastRegion; - - public CoprocessorRpcChannel(HConnection conn, byte[] table, byte[] row) { - this.connection = conn; - this.table = table; - this.row = row; - } - @Override public void callMethod(Descriptors.MethodDescriptor method, RpcController controller, @@ -87,46 +63,6 @@ public class CoprocessorRpcChannel implements RpcChannel, BlockingRpcChannel { } } - private Message callExecService(Descriptors.MethodDescriptor method, - Message request, Message responsePrototype) - throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Call: "+method.getName()+", "+request.toString()); - } - - if (row == null) { - throw new IllegalArgumentException("Missing row property for remote region location"); - } - - final ClientProtos.CoprocessorServiceCall call = - ClientProtos.CoprocessorServiceCall.newBuilder() - .setRow(ByteString.copyFrom(row)) - .setServiceName(method.getService().getFullName()) - .setMethodName(method.getName()) - .setRequest(request.toByteString()).build(); - ServerCallable callable = - new ServerCallable(connection, table, row) { - public CoprocessorServiceResponse call() throws Exception { - byte[] regionName = location.getRegionInfo().getRegionName(); - return ProtobufUtil.execService(server, call, regionName); - } - }; - CoprocessorServiceResponse result = callable.withRetries(); - Message response = null; - if (result.getValue().hasValue()) { - response = responsePrototype.newBuilderForType() - .mergeFrom(result.getValue().getValue()).build(); - } else { - response = responsePrototype.getDefaultInstanceForType(); - } - lastRegion = result.getRegion().getValue().toByteArray(); - if (LOG.isTraceEnabled()) { - LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response); - } - return response; - } - - public byte[] getLastRegion() { - return lastRegion; - } + protected abstract Message callExecService(Descriptors.MethodDescriptor method, + Message request, Message responsePrototype) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java new file mode 100644 index 00000000000..b3a4228053e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +import java.io.IOException; + +import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; + +/** + * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s + * against the active master. An instance of this class may be obtained + * by calling {@link org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService()}, + * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to call the endpoint + * methods. + * @see org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService() + */ +@InterfaceAudience.Private +public class MasterCoprocessorRpcChannel extends CoprocessorRpcChannel{ + private static Log LOG = LogFactory.getLog(MasterCoprocessorRpcChannel.class); + + private final HConnection connection; + + public MasterCoprocessorRpcChannel(HConnection conn) { + this.connection = conn; + } + + @Override + protected Message callExecService(Descriptors.MethodDescriptor method, + Message request, Message responsePrototype) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Call: "+method.getName()+", "+request.toString()); + } + + final ClientProtos.CoprocessorServiceCall call = + ClientProtos.CoprocessorServiceCall.newBuilder() + .setRow(ByteString.copyFrom(HConstants.EMPTY_BYTE_ARRAY)) + .setServiceName(method.getService().getFullName()) + .setMethodName(method.getName()) + .setRequest(request.toByteString()).build(); + CoprocessorServiceResponse result = ProtobufUtil.execService(connection.getMasterAdmin(), call); + Message response = null; + if (result.getValue().hasValue()) { + response = responsePrototype.newBuilderForType() + .mergeFrom(result.getValue().getValue()).build(); + } else { + response = responsePrototype.getDefaultInstanceForType(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Master Result is value=" + response); + } + return response; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java new file mode 100644 index 00000000000..e5855c976e8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; + +import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; + +/** + * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s + * against a given table region. An instance of this class may be obtained + * by calling {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])}, + * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to call the endpoint + * methods. + * @see org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[]) + */ +@InterfaceAudience.Private +public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ + private static Log LOG = LogFactory.getLog(RegionCoprocessorRpcChannel.class); + + private final HConnection connection; + private final byte[] table; + private final byte[] row; + private byte[] lastRegion; + + public RegionCoprocessorRpcChannel(HConnection conn, byte[] table, byte[] row) { + this.connection = conn; + this.table = table; + this.row = row; + } + + @Override + protected Message callExecService(Descriptors.MethodDescriptor method, + Message request, Message responsePrototype) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Call: "+method.getName()+", "+request.toString()); + } + + if (row == null) { + throw new IllegalArgumentException("Missing row property for remote region location"); + } + + final ClientProtos.CoprocessorServiceCall call = + ClientProtos.CoprocessorServiceCall.newBuilder() + .setRow(ByteString.copyFrom(row)) + .setServiceName(method.getService().getFullName()) + .setMethodName(method.getName()) + .setRequest(request.toByteString()).build(); + ServerCallable callable = + new ServerCallable(connection, table, row) { + public CoprocessorServiceResponse call() throws Exception { + byte[] regionName = location.getRegionInfo().getRegionName(); + return ProtobufUtil.execService(server, call, regionName); + } + }; + CoprocessorServiceResponse result = callable.withRetries(); + Message response = null; + if (result.getValue().hasValue()) { + response = responsePrototype.newBuilderForType() + .mergeFrom(result.getValue().getValue()).build(); + } else { + response = responsePrototype.getDefaultInstanceForType(); + } + lastRegion = result.getRegion().getValue().toByteArray(); + if (LOG.isTraceEnabled()) { + LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response); + } + return response; + } + + public byte[] getLastRegion() { + return lastRegion; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 758dd3013fd..c82b38f3525 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -40,6 +40,11 @@ import java.util.concurrent.atomic.AtomicReference; import javax.management.ObjectName; +import com.google.common.collect.Maps; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -82,6 +87,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; @@ -101,7 +107,9 @@ import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; @@ -159,6 +167,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -309,6 +318,8 @@ Server { private SpanReceiverHost spanReceiverHost; + private Map coprocessorServiceHandlers = Maps.newHashMap(); + /** * Initializes the HMaster. The steps are as follows: *

@@ -2274,6 +2285,79 @@ Server { return OfflineRegionResponse.newBuilder().build(); } + @Override + public boolean registerService(Service instance) { + /* + * No stacking of instances is allowed for a single service name + */ + Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType(); + if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) { + LOG.error("Coprocessor service "+serviceDesc.getFullName()+ + " already registered, rejecting request from "+instance + ); + return false; + } + + coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance); + if (LOG.isDebugEnabled()) { + LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName()); + } + return true; + } + + @Override + public ClientProtos.CoprocessorServiceResponse execMasterService(final RpcController controller, + final ClientProtos.CoprocessorServiceRequest request) throws ServiceException { + try { + ServerRpcController execController = new ServerRpcController(); + + ClientProtos.CoprocessorServiceCall call = request.getCall(); + String serviceName = call.getServiceName(); + String methodName = call.getMethodName(); + if (!coprocessorServiceHandlers.containsKey(serviceName)) { + throw new HBaseRPC.UnknownProtocolException(null, + "No registered master coprocessor service found for name "+serviceName); + } + + Service service = coprocessorServiceHandlers.get(serviceName); + Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType(); + Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName); + if (methodDesc == null) { + throw new HBaseRPC.UnknownProtocolException(service.getClass(), + "Unknown method "+methodName+" called on master service "+serviceName); + } + + //invoke the method + Message execRequest = service.getRequestPrototype(methodDesc).newBuilderForType() + .mergeFrom(call.getRequest()).build(); + final Message.Builder responseBuilder = + service.getResponsePrototype(methodDesc).newBuilderForType(); + service.callMethod(methodDesc, controller, execRequest, new RpcCallback() { + @Override + public void run(Message message) { + if (message != null) { + responseBuilder.mergeFrom(message); + } + } + }); + Message execResult = responseBuilder.build(); + + if (execController.getFailedOn() != null) { + throw execController.getFailedOn(); + } + ClientProtos.CoprocessorServiceResponse.Builder builder = + ClientProtos.CoprocessorServiceResponse.newBuilder(); + builder.setRegion(RequestConverter.buildRegionSpecifier( + RegionSpecifierType.REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)); + builder.setValue( + builder.getValueBuilder().setName(execResult.getClass().getName()) + .setValue(execResult.toByteString())); + return builder.build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + /** * Utility for constructing an instance of the passed HMaster class. * @param masterClass diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 2e228a29f33..6548963b6c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -70,6 +70,11 @@ public class MasterCoprocessorHost public MasterEnvironment createEnvironment(final Class implClass, final Coprocessor instance, final int priority, final int seq, final Configuration conf) { + for (Class c : implClass.getInterfaces()) { + if (CoprocessorService.class.isAssignableFrom(c)) { + masterServices.registerService(((CoprocessorService)instance).getService()); + } + } return new MasterEnvironment(implClass, instance, priority, seq, conf, masterServices); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 7ad75d1b786..69f853c8f11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import com.google.protobuf.Service; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; @@ -77,4 +78,23 @@ public interface MasterServices extends Server { * @return true if master enables ServerShutdownHandler; */ public boolean isServerShutdownHandlerEnabled(); + + /** + * Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint to + * be available for handling + * {@link org.apache.hadoop.hbase.MasterAdminProtocol#execMasterService(com.google.protobuf.RpcController, + * org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)} calls. + * + *

+ * Only a single instance may be registered for a given {@link Service} subclass (the + * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}. + * After the first registration, subsequent calls with the same service name will fail with + * a return value of {@code false}. + *

+ * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint + * @return {@code true} if the registration was successful, {@code false} + * otherwise + */ + public boolean registerService(Service instance); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 29e66f028a9..41fc4feba9f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MasterAdminProtocol; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.AdminProtocol; @@ -1347,6 +1348,20 @@ public final class ProtobufUtil { } } + public static CoprocessorServiceResponse execService(final MasterAdminProtocol client, + final CoprocessorServiceCall call) throws IOException { + CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() + .setCall(call).setRegion( + RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build(); + try { + CoprocessorServiceResponse response = + client.execMasterService(null, request); + return response; + } catch (ServiceException se) { + throw getRemoteException(se); + } + } + @SuppressWarnings("unchecked") public static T newServiceStub(Class service, RpcChannel channel) throws Exception { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterAdminProtos.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterAdminProtos.java index 13088d923d9..71be12ae060 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterAdminProtos.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterAdminProtos.java @@ -14464,6 +14464,11 @@ public final class MasterAdminProtos { org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest request, com.google.protobuf.RpcCallback done); + public abstract void execMasterService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request, + com.google.protobuf.RpcCallback done); + } public static com.google.protobuf.Service newReflectiveService( @@ -14621,6 +14626,14 @@ public final class MasterAdminProtos { impl.isCatalogJanitorEnabled(controller, request, done); } + @java.lang.Override + public void execMasterService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request, + com.google.protobuf.RpcCallback done) { + impl.execMasterService(controller, request, done); + } + }; } @@ -14681,6 +14694,8 @@ public final class MasterAdminProtos { return impl.enableCatalogJanitor(controller, (org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest)request); case 18: return impl.isCatalogJanitorEnabled(controller, (org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest)request); + case 19: + return impl.execMasterService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request); default: throw new java.lang.AssertionError("Can't get here."); } @@ -14733,6 +14748,8 @@ public final class MasterAdminProtos { return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest.getDefaultInstance(); case 18: return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest.getDefaultInstance(); + case 19: + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); } @@ -14785,6 +14802,8 @@ public final class MasterAdminProtos { return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse.getDefaultInstance(); case 18: return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance(); + case 19: + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); } @@ -14888,6 +14907,11 @@ public final class MasterAdminProtos { org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest request, com.google.protobuf.RpcCallback done); + public abstract void execMasterService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request, + com.google.protobuf.RpcCallback done); + public static final com.google.protobuf.Descriptors.ServiceDescriptor getDescriptor() { @@ -15005,6 +15029,11 @@ public final class MasterAdminProtos { com.google.protobuf.RpcUtil.specializeCallback( done)); return; + case 19: + this.execMasterService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request, + com.google.protobuf.RpcUtil.specializeCallback( + done)); + return; default: throw new java.lang.AssertionError("Can't get here."); } @@ -15057,6 +15086,8 @@ public final class MasterAdminProtos { return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest.getDefaultInstance(); case 18: return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest.getDefaultInstance(); + case 19: + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); } @@ -15109,6 +15140,8 @@ public final class MasterAdminProtos { return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse.getDefaultInstance(); case 18: return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance(); + case 19: + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); } @@ -15414,6 +15447,21 @@ public final class MasterAdminProtos { org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.class, org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance())); } + + public void execMasterService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request, + com.google.protobuf.RpcCallback done) { + channel.callMethod( + getDescriptor().getMethods().get(19), + controller, + request, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance(), + com.google.protobuf.RpcUtil.generalizeCallback( + done, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.class, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance())); + } } public static BlockingInterface newBlockingStub( @@ -15516,6 +15564,11 @@ public final class MasterAdminProtos { com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest request) throws com.google.protobuf.ServiceException; + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse execMasterService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request) + throws com.google.protobuf.ServiceException; } private static final class BlockingStub implements BlockingInterface { @@ -15752,6 +15805,18 @@ public final class MasterAdminProtos { org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse.getDefaultInstance()); } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse execMasterService( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest request) + throws com.google.protobuf.ServiceException { + return (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse) channel.callBlockingMethod( + getDescriptor().getMethods().get(19), + controller, + request, + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse.getDefaultInstance()); + } + } } @@ -15954,79 +16019,81 @@ public final class MasterAdminProtos { descriptor; static { java.lang.String[] descriptorData = { - "\n\021MasterAdmin.proto\032\013hbase.proto\"R\n\020AddC" + - "olumnRequest\022\021\n\ttableName\030\001 \002(\014\022+\n\016colum" + - "nFamilies\030\002 \002(\0132\023.ColumnFamilySchema\"\023\n\021" + - "AddColumnResponse\"<\n\023DeleteColumnRequest" + - "\022\021\n\ttableName\030\001 \002(\014\022\022\n\ncolumnName\030\002 \002(\014\"" + - "\026\n\024DeleteColumnResponse\"U\n\023ModifyColumnR" + - "equest\022\021\n\ttableName\030\001 \002(\014\022+\n\016columnFamil" + - "ies\030\002 \002(\0132\023.ColumnFamilySchema\"\026\n\024Modify" + - "ColumnResponse\"Z\n\021MoveRegionRequest\022 \n\006r" + - "egion\030\001 \002(\0132\020.RegionSpecifier\022#\n\016destSer", - "verName\030\002 \001(\0132\013.ServerName\"\024\n\022MoveRegion" + - "Response\"7\n\023AssignRegionRequest\022 \n\006regio" + - "n\030\001 \002(\0132\020.RegionSpecifier\"\026\n\024AssignRegio" + - "nResponse\"O\n\025UnassignRegionRequest\022 \n\006re" + - "gion\030\001 \002(\0132\020.RegionSpecifier\022\024\n\005force\030\002 " + - "\001(\010:\005false\"\030\n\026UnassignRegionResponse\"8\n\024" + - "OfflineRegionRequest\022 \n\006region\030\001 \002(\0132\020.R" + - "egionSpecifier\"\027\n\025OfflineRegionResponse\"" + - "J\n\022CreateTableRequest\022!\n\013tableSchema\030\001 \002" + - "(\0132\014.TableSchema\022\021\n\tsplitKeys\030\002 \003(\014\"\025\n\023C", - "reateTableResponse\"\'\n\022DeleteTableRequest" + - "\022\021\n\ttableName\030\001 \002(\014\"\025\n\023DeleteTableRespon" + - "se\"\'\n\022EnableTableRequest\022\021\n\ttableName\030\001 " + - "\002(\014\"\025\n\023EnableTableResponse\"(\n\023DisableTab" + - "leRequest\022\021\n\ttableName\030\001 \002(\014\"\026\n\024DisableT" + - "ableResponse\"J\n\022ModifyTableRequest\022\021\n\tta" + - "bleName\030\001 \002(\014\022!\n\013tableSchema\030\002 \002(\0132\014.Tab" + - "leSchema\"\025\n\023ModifyTableResponse\"\021\n\017Shutd" + - "ownRequest\"\022\n\020ShutdownResponse\"\023\n\021StopMa" + - "sterRequest\"\024\n\022StopMasterResponse\"\020\n\016Bal", - "anceRequest\"&\n\017BalanceResponse\022\023\n\013balanc" + - "erRan\030\001 \002(\010\"<\n\031SetBalancerRunningRequest" + - "\022\n\n\002on\030\001 \002(\010\022\023\n\013synchronous\030\002 \001(\010\"6\n\032Set" + - "BalancerRunningResponse\022\030\n\020prevBalanceVa" + - "lue\030\001 \001(\010\"\024\n\022CatalogScanRequest\")\n\023Catal" + - "ogScanResponse\022\022\n\nscanResult\030\001 \001(\005\"-\n\033En" + - "ableCatalogJanitorRequest\022\016\n\006enable\030\001 \002(" + - "\010\"1\n\034EnableCatalogJanitorResponse\022\021\n\tpre" + - "vValue\030\001 \001(\010\" \n\036IsCatalogJanitorEnabledR" + - "equest\"0\n\037IsCatalogJanitorEnabledRespons", - "e\022\r\n\005value\030\001 \002(\0102\263\t\n\022MasterAdminService\022" + - "2\n\taddColumn\022\021.AddColumnRequest\032\022.AddCol" + - "umnResponse\022;\n\014deleteColumn\022\024.DeleteColu" + - "mnRequest\032\025.DeleteColumnResponse\022;\n\014modi" + - "fyColumn\022\024.ModifyColumnRequest\032\025.ModifyC" + - "olumnResponse\0225\n\nmoveRegion\022\022.MoveRegion" + - "Request\032\023.MoveRegionResponse\022;\n\014assignRe" + - "gion\022\024.AssignRegionRequest\032\025.AssignRegio" + - "nResponse\022A\n\016unassignRegion\022\026.UnassignRe" + - "gionRequest\032\027.UnassignRegionResponse\022>\n\r", - "offlineRegion\022\025.OfflineRegionRequest\032\026.O" + - "fflineRegionResponse\0228\n\013deleteTable\022\023.De" + - "leteTableRequest\032\024.DeleteTableResponse\0228" + - "\n\013enableTable\022\023.EnableTableRequest\032\024.Ena" + - "bleTableResponse\022;\n\014disableTable\022\024.Disab" + - "leTableRequest\032\025.DisableTableResponse\0228\n" + - "\013modifyTable\022\023.ModifyTableRequest\032\024.Modi" + - "fyTableResponse\0228\n\013createTable\022\023.CreateT" + - "ableRequest\032\024.CreateTableResponse\022/\n\010shu" + - "tdown\022\020.ShutdownRequest\032\021.ShutdownRespon", - "se\0225\n\nstopMaster\022\022.StopMasterRequest\032\023.S" + - "topMasterResponse\022,\n\007balance\022\017.BalanceRe" + - "quest\032\020.BalanceResponse\022M\n\022setBalancerRu" + - "nning\022\032.SetBalancerRunningRequest\032\033.SetB" + - "alancerRunningResponse\022;\n\016runCatalogScan" + - "\022\023.CatalogScanRequest\032\024.CatalogScanRespo" + - "nse\022S\n\024enableCatalogJanitor\022\034.EnableCata" + - "logJanitorRequest\032\035.EnableCatalogJanitor" + - "Response\022\\\n\027isCatalogJanitorEnabled\022\037.Is" + - "CatalogJanitorEnabledRequest\032 .IsCatalog", - "JanitorEnabledResponseBG\n*org.apache.had" + - "oop.hbase.protobuf.generatedB\021MasterAdmi" + - "nProtosH\001\210\001\001\240\001\001" + "\n\021MasterAdmin.proto\032\013hbase.proto\032\014Client" + + ".proto\"R\n\020AddColumnRequest\022\021\n\ttableName\030" + + "\001 \002(\014\022+\n\016columnFamilies\030\002 \002(\0132\023.ColumnFa" + + "milySchema\"\023\n\021AddColumnResponse\"<\n\023Delet" + + "eColumnRequest\022\021\n\ttableName\030\001 \002(\014\022\022\n\ncol" + + "umnName\030\002 \002(\014\"\026\n\024DeleteColumnResponse\"U\n" + + "\023ModifyColumnRequest\022\021\n\ttableName\030\001 \002(\014\022" + + "+\n\016columnFamilies\030\002 \002(\0132\023.ColumnFamilySc" + + "hema\"\026\n\024ModifyColumnResponse\"Z\n\021MoveRegi" + + "onRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif", + "ier\022#\n\016destServerName\030\002 \001(\0132\013.ServerName" + + "\"\024\n\022MoveRegionResponse\"7\n\023AssignRegionRe" + + "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\"" + + "\026\n\024AssignRegionResponse\"O\n\025UnassignRegio" + + "nRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi" + + "er\022\024\n\005force\030\002 \001(\010:\005false\"\030\n\026UnassignRegi" + + "onResponse\"8\n\024OfflineRegionRequest\022 \n\006re" + + "gion\030\001 \002(\0132\020.RegionSpecifier\"\027\n\025OfflineR" + + "egionResponse\"J\n\022CreateTableRequest\022!\n\013t" + + "ableSchema\030\001 \002(\0132\014.TableSchema\022\021\n\tsplitK", + "eys\030\002 \003(\014\"\025\n\023CreateTableResponse\"\'\n\022Dele" + + "teTableRequest\022\021\n\ttableName\030\001 \002(\014\"\025\n\023Del" + + "eteTableResponse\"\'\n\022EnableTableRequest\022\021" + + "\n\ttableName\030\001 \002(\014\"\025\n\023EnableTableResponse" + + "\"(\n\023DisableTableRequest\022\021\n\ttableName\030\001 \002" + + "(\014\"\026\n\024DisableTableResponse\"J\n\022ModifyTabl" + + "eRequest\022\021\n\ttableName\030\001 \002(\014\022!\n\013tableSche" + + "ma\030\002 \002(\0132\014.TableSchema\"\025\n\023ModifyTableRes" + + "ponse\"\021\n\017ShutdownRequest\"\022\n\020ShutdownResp" + + "onse\"\023\n\021StopMasterRequest\"\024\n\022StopMasterR", + "esponse\"\020\n\016BalanceRequest\"&\n\017BalanceResp" + + "onse\022\023\n\013balancerRan\030\001 \002(\010\"<\n\031SetBalancer" + + "RunningRequest\022\n\n\002on\030\001 \002(\010\022\023\n\013synchronou" + + "s\030\002 \001(\010\"6\n\032SetBalancerRunningResponse\022\030\n" + + "\020prevBalanceValue\030\001 \001(\010\"\024\n\022CatalogScanRe" + + "quest\")\n\023CatalogScanResponse\022\022\n\nscanResu" + + "lt\030\001 \001(\005\"-\n\033EnableCatalogJanitorRequest\022" + + "\016\n\006enable\030\001 \002(\010\"1\n\034EnableCatalogJanitorR" + + "esponse\022\021\n\tprevValue\030\001 \001(\010\" \n\036IsCatalogJ" + + "anitorEnabledRequest\"0\n\037IsCatalogJanitor", + "EnabledResponse\022\r\n\005value\030\001 \002(\0102\201\n\n\022Maste" + + "rAdminService\0222\n\taddColumn\022\021.AddColumnRe" + + "quest\032\022.AddColumnResponse\022;\n\014deleteColum" + + "n\022\024.DeleteColumnRequest\032\025.DeleteColumnRe" + + "sponse\022;\n\014modifyColumn\022\024.ModifyColumnReq" + + "uest\032\025.ModifyColumnResponse\0225\n\nmoveRegio" + + "n\022\022.MoveRegionRequest\032\023.MoveRegionRespon" + + "se\022;\n\014assignRegion\022\024.AssignRegionRequest" + + "\032\025.AssignRegionResponse\022A\n\016unassignRegio" + + "n\022\026.UnassignRegionRequest\032\027.UnassignRegi", + "onResponse\022>\n\rofflineRegion\022\025.OfflineReg" + + "ionRequest\032\026.OfflineRegionResponse\0228\n\013de" + + "leteTable\022\023.DeleteTableRequest\032\024.DeleteT" + + "ableResponse\0228\n\013enableTable\022\023.EnableTabl" + + "eRequest\032\024.EnableTableResponse\022;\n\014disabl" + + "eTable\022\024.DisableTableRequest\032\025.DisableTa" + + "bleResponse\0228\n\013modifyTable\022\023.ModifyTable" + + "Request\032\024.ModifyTableResponse\0228\n\013createT" + + "able\022\023.CreateTableRequest\032\024.CreateTableR" + + "esponse\022/\n\010shutdown\022\020.ShutdownRequest\032\021.", + "ShutdownResponse\0225\n\nstopMaster\022\022.StopMas" + + "terRequest\032\023.StopMasterResponse\022,\n\007balan" + + "ce\022\017.BalanceRequest\032\020.BalanceResponse\022M\n" + + "\022setBalancerRunning\022\032.SetBalancerRunning" + + "Request\032\033.SetBalancerRunningResponse\022;\n\016" + + "runCatalogScan\022\023.CatalogScanRequest\032\024.Ca" + + "talogScanResponse\022S\n\024enableCatalogJanito" + + "r\022\034.EnableCatalogJanitorRequest\032\035.Enable" + + "CatalogJanitorResponse\022\\\n\027isCatalogJanit" + + "orEnabled\022\037.IsCatalogJanitorEnabledReque", + "st\032 .IsCatalogJanitorEnabledResponse\022L\n\021" + + "execMasterService\022\032.CoprocessorServiceRe" + + "quest\032\033.CoprocessorServiceResponseBG\n*or" + + "g.apache.hadoop.hbase.protobuf.generated" + + "B\021MasterAdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -16344,6 +16411,7 @@ public final class MasterAdminProtos { .internalBuildGeneratedFileFrom(descriptorData, new com.google.protobuf.Descriptors.FileDescriptor[] { org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.getDescriptor(), + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.getDescriptor(), }, assigner); } diff --git a/hbase-server/src/main/protobuf/MasterAdmin.proto b/hbase-server/src/main/protobuf/MasterAdmin.proto index f3e193ed58e..dc62bb4f8bb 100644 --- a/hbase-server/src/main/protobuf/MasterAdmin.proto +++ b/hbase-server/src/main/protobuf/MasterAdmin.proto @@ -25,6 +25,7 @@ option java_generate_equals_and_hash = true; option optimize_for = SPEED; import "hbase.proto"; +import "Client.proto"; /* Column-level protobufs */ @@ -273,4 +274,10 @@ service MasterAdminService { */ rpc isCatalogJanitorEnabled(IsCatalogJanitorEnabledRequest) returns(IsCatalogJanitorEnabledResponse); + + /** + * Call a master coprocessor endpoint + */ + rpc execMasterService(CoprocessorServiceRequest) + returns(CoprocessorServiceResponse); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java index ece73515321..c7018e42829 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java @@ -45,6 +45,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; /** * TestEndpoint: test cases to verify coprocessor Endpoint @@ -76,6 +77,8 @@ public class TestCoprocessorEndpoint { org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(), org.apache.hadoop.hbase.coprocessor.GenericEndpoint.class.getName(), ProtobufCoprocessorService.class.getName()); + conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); util.startMiniCluster(2); HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor desc = new HTableDescriptor(TEST_TABLE); @@ -253,6 +256,17 @@ public class TestCoprocessorEndpoint { } } + @Test + public void testMasterCoprocessorService() throws Throwable { + HBaseAdmin admin = util.getHBaseAdmin(); + final TestProtos.EchoRequestProto request = + TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build(); + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = + TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService()); + assertEquals("hello", service.echo(null, request).getMessage()); + admin.close(); + } + private static byte[][] makeN(byte[] base, int n) { byte[][] ret = new byte[n][]; for (int i = 0; i < n; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 8cf59cf92ae..2d17820a3b2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import com.google.protobuf.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -288,6 +289,11 @@ public class TestCatalogJanitor { public boolean isServerShutdownHandlerEnabled() { return true; } + + @Override + public boolean registerService(Service instance) { + return false; + } } @Test