HBASE-13646 HRegion#execService should not try to build incomplete messages

This commit is contained in:
Andrey Stepachev 2015-05-14 13:21:48 +01:00
parent f7c3a3dd55
commit 9ba7337ac8
9 changed files with 151 additions and 22 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.protobuf;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -323,6 +324,25 @@ public final class ResponseConverter {
}
}
/**
* Retreivies exception stored during RPC invocation.
* @param controller the controller instance provided by the client when calling the service
* @return exception if any, or null; Will return DoNotRetryIOException for string represented
* failure causes in controller.
*/
@Nullable
public static IOException getControllerException(RpcController controller) throws IOException {
if (controller != null && controller.failed()) {
if (controller instanceof ServerRpcController) {
return ((ServerRpcController)controller).getFailedOn();
} else {
return new DoNotRetryIOException(controller.errorText());
}
}
return null;
}
/**
* Create Results from the cells using the cells meta data.
* @param cellScanner

View File

@ -791,6 +791,7 @@
<include>ColumnAggregationProtocol.proto</include>
<include>IncrementCounterProcessor.proto</include>
<include>PingProtocol.proto</include>
<include>DummyRegionServerEndpoint.proto</include>
<include>test.proto</include>
<include>test_delayed_rpc.proto</include>
<include>test_rpc_service.proto</include>

View File

@ -128,6 +128,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@ -7200,8 +7201,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
LOG.error("Coprocessor service "+serviceDesc.getFullName()+
" already registered, rejecting request from "+instance
LOG.error("Coprocessor service " + serviceDesc.getFullName() +
" already registered, rejecting request from " + instance
);
return false;
}
@ -7257,6 +7258,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
}
IOException exception = ResponseConverter.getControllerException(controller);
if (exception != null) {
throw exception;
}
return responseBuilder.build();
}

View File

@ -106,6 +106,7 @@ import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
@ -3201,10 +3202,11 @@ public class HRegionServer extends HasThread implements
return result;
}
public CoprocessorServiceResponse execRegionServerService(final RpcController controller,
public CoprocessorServiceResponse execRegionServerService(
@SuppressWarnings("UnusedParameters") final RpcController controller,
final CoprocessorServiceRequest serviceRequest) throws ServiceException {
try {
ServerRpcController execController = new ServerRpcController();
ServerRpcController serviceController = new ServerRpcController();
CoprocessorServiceCall call = serviceRequest.getCall();
String serviceName = call.getServiceName();
String methodName = call.getMethodName();
@ -3224,7 +3226,7 @@ public class HRegionServer extends HasThread implements
.build();
final Message.Builder responseBuilder =
service.getResponsePrototype(methodDesc).newBuilderForType();
service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
service.callMethod(methodDesc, serviceController, request, new RpcCallback<Message>() {
@Override
public void run(Message message) {
if (message != null) {
@ -3232,10 +3234,11 @@ public class HRegionServer extends HasThread implements
}
}
});
Message execResult = responseBuilder.build();
if (execController.getFailedOn() != null) {
throw execController.getFailedOn();
IOException exception = ResponseConverter.getControllerException(serviceController);
if (exception != null) {
throw exception;
}
Message execResult = responseBuilder.build();
ClientProtos.CoprocessorServiceResponse.Builder builder =
ClientProtos.CoprocessorServiceResponse.newBuilder();
builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,

View File

@ -1831,11 +1831,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
// ignore the passed in controller (from the serialized call)
ServerRpcController execController = new ServerRpcController();
Message result = region.execService(execController, serviceCall);
if (execController.getFailedOn() != null) {
throw execController.getFailedOn();
}
return result;
return region.execService(execController, serviceCall);
}
/**

View File

@ -87,6 +87,7 @@ public class TestCoprocessorEndpoint {
public static void setupBeforeClass() throws Exception {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000);
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
ProtobufCoprocessorService.class.getName());

View File

@ -17,8 +17,7 @@
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
@ -26,15 +25,15 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
@ -45,8 +44,12 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category({CoprocessorTests.class, MediumTests.class})
public class TestRegionServerCoprocessorEndpoint {
public static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt");
private static HBaseTestingUtility TEST_UTIL = null;
private static Configuration CONF = null;
private static final String DUMMY_VALUE = "val";
@ -75,13 +78,30 @@ public class TestRegionServerCoprocessorEndpoint {
ProtobufUtil.newServiceStub(DummyRegionServerEndpointProtos.DummyService.class,
TEST_UTIL.getHBaseAdmin().coprocessorService(serverName));
service.dummyCall(controller,
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(), rpcCallback);
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(), rpcCallback);
assertEquals(DUMMY_VALUE, rpcCallback.get().getValue());
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
}
@Test
public void testEndpointExceptions() throws Exception {
final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
final ServerRpcController controller = new ServerRpcController();
final BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse> rpcCallback =
new BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>();
DummyRegionServerEndpointProtos.DummyService service =
ProtobufUtil.newServiceStub(DummyRegionServerEndpointProtos.DummyService.class,
TEST_UTIL.getHBaseAdmin().coprocessorService(serverName));
service.dummyThrow(controller,
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(), rpcCallback);
assertEquals(null, rpcCallback.get());
assertTrue(controller.failedOnException());
assertEquals(WHAT_TO_THROW.getClass().getName().trim(),
((RemoteWithExtrasException) controller.getFailedOn().getCause()).getClassName().trim());
}
static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService {
@Override
@ -104,5 +124,13 @@ public class TestRegionServerCoprocessorEndpoint {
RpcCallback<DummyResponse> callback) {
callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
}
@Override
public void dummyThrow(RpcController controller,
DummyRequest request,
RpcCallback<DummyResponse> done) {
ResponseConverter.setControllerException(controller, WHAT_TO_THROW);
}
}
}

View File

@ -879,6 +879,14 @@ public final class DummyRegionServerEndpointProtos {
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse> done);
/**
* <code>rpc dummyThrow(.DummyRequest) returns (.DummyResponse);</code>
*/
public abstract void dummyThrow(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse> done);
}
public static com.google.protobuf.Service newReflectiveService(
@ -892,6 +900,14 @@ public final class DummyRegionServerEndpointProtos {
impl.dummyCall(controller, request, done);
}
@java.lang.Override
public void dummyThrow(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse> done) {
impl.dummyThrow(controller, request, done);
}
};
}
@ -916,6 +932,8 @@ public final class DummyRegionServerEndpointProtos {
switch(method.getIndex()) {
case 0:
return impl.dummyCall(controller, (org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest)request);
case 1:
return impl.dummyThrow(controller, (org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest)request);
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -932,6 +950,8 @@ public final class DummyRegionServerEndpointProtos {
switch(method.getIndex()) {
case 0:
return org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
case 1:
return org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -948,6 +968,8 @@ public final class DummyRegionServerEndpointProtos {
switch(method.getIndex()) {
case 0:
return org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.getDefaultInstance();
case 1:
return org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -964,6 +986,14 @@ public final class DummyRegionServerEndpointProtos {
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse> done);
/**
* <code>rpc dummyThrow(.DummyRequest) returns (.DummyResponse);</code>
*/
public abstract void dummyThrow(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse> done);
public static final
com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptor() {
@ -991,6 +1021,11 @@ public final class DummyRegionServerEndpointProtos {
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse>specializeCallback(
done));
return;
case 1:
this.dummyThrow(controller, (org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest)request,
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse>specializeCallback(
done));
return;
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -1007,6 +1042,8 @@ public final class DummyRegionServerEndpointProtos {
switch(method.getIndex()) {
case 0:
return org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
case 1:
return org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -1023,6 +1060,8 @@ public final class DummyRegionServerEndpointProtos {
switch(method.getIndex()) {
case 0:
return org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.getDefaultInstance();
case 1:
return org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -1058,6 +1097,21 @@ public final class DummyRegionServerEndpointProtos {
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.class,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.getDefaultInstance()));
}
public void dummyThrow(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse> done) {
channel.callMethod(
getDescriptor().getMethods().get(1),
controller,
request,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.getDefaultInstance(),
com.google.protobuf.RpcUtil.generalizeCallback(
done,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.class,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.getDefaultInstance()));
}
}
public static BlockingInterface newBlockingStub(
@ -1070,6 +1124,11 @@ public final class DummyRegionServerEndpointProtos {
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest request)
throws com.google.protobuf.ServiceException;
public org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse dummyThrow(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest request)
throws com.google.protobuf.ServiceException;
}
private static final class BlockingStub implements BlockingInterface {
@ -1090,6 +1149,18 @@ public final class DummyRegionServerEndpointProtos {
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.getDefaultInstance());
}
public org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse dummyThrow(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest request)
throws com.google.protobuf.ServiceException {
return (org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse) channel.callBlockingMethod(
getDescriptor().getMethods().get(1),
controller,
request,
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse.getDefaultInstance());
}
}
// @@protoc_insertion_point(class_scope:DummyService)
@ -1116,10 +1187,12 @@ public final class DummyRegionServerEndpointProtos {
java.lang.String[] descriptorData = {
"\n\037DummyRegionServerEndpoint.proto\"\016\n\014Dum" +
"myRequest\"\036\n\rDummyResponse\022\r\n\005value\030\001 \002(" +
"\t2:\n\014DummyService\022*\n\tdummyCall\022\r.DummyRe" +
"quest\032\016.DummyResponseB_\n6org.apache.hado" +
"op.hbase.coprocessor.protobuf.generatedB" +
"\037DummyRegionServerEndpointProtos\210\001\001\240\001\001"
"\t2g\n\014DummyService\022*\n\tdummyCall\022\r.DummyRe" +
"quest\032\016.DummyResponse\022+\n\ndummyThrow\022\r.Du" +
"mmyRequest\032\016.DummyResponseB_\n6org.apache" +
".hadoop.hbase.coprocessor.protobuf.gener" +
"atedB\037DummyRegionServerEndpointProtos\210\001\001" +
"\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View File

@ -30,4 +30,5 @@ message DummyResponse {
service DummyService {
rpc dummyCall(DummyRequest) returns(DummyResponse);
rpc dummyThrow(DummyRequest) returns(DummyResponse);
}