HBASE-16526 Add more ipc tests

This commit is contained in:
zhangduo 2016-08-31 16:18:26 +08:00
parent c536c85116
commit 45af3831fe
14 changed files with 1485 additions and 716 deletions

View File

@ -214,7 +214,7 @@ public abstract class AbstractRpcClient implements RpcClient {
* new Connection each time.
* @return A pair with the Message response and the Cell data (if any).
*/
Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
private Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
Message param, Message returnType, final User ticket, final InetSocketAddress isa)
throws ServiceException {
if (pcrc == null) {

View File

@ -19,10 +19,17 @@
package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@ -42,11 +49,9 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
@ -55,12 +60,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.google.protobuf.Descriptors.MethodDescriptor;
@Category(IntegrationTests.class)
public class IntegrationTestRpcClient {
@ -95,29 +94,6 @@ public class IntegrationTestRpcClient {
}
}
static final BlockingService SERVICE =
TestRpcServiceProtos.TestProtobufRpcProto
.newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
@Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
throws ServiceException {
return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
}
});
protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) {
return isSyncClient ?
new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) :
@ -301,14 +277,11 @@ public class IntegrationTestRpcClient {
@Override
public void run() {
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
while (running.get()) {
boolean isBigPayload = random.nextBoolean();
String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build();
EchoResponseProto ret;
TestRpcServer server = cluster.getRandomServer();
try {
User user = User.getCurrent();
@ -317,8 +290,8 @@ public class IntegrationTestRpcClient {
throw new IOException("Listener channel is closed");
}
sending.set(true);
ret = (EchoResponseProto)
rpcClient.callBlockingMethod(md, null, param, ret, user, address);
BlockingInterface stub = newBlockingStub(rpcClient, address, user);
ret = stub.echo(null, param);
} catch (Exception e) {
LOG.warn(e);
continue; // expected in case connection is closing or closed

View File

@ -45,6 +45,22 @@ public final class TestRpcServiceProtos {
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done);
/**
* <code>rpc pause(.PauseRequestProto) returns (.EmptyResponseProto);</code>
*/
public abstract void pause(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done);
/**
* <code>rpc addr(.EmptyRequestProto) returns (.AddrResponseProto);</code>
*/
public abstract void addr(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto> done);
}
public static com.google.protobuf.Service newReflectiveService(
@ -74,6 +90,22 @@ public final class TestRpcServiceProtos {
impl.error(controller, request, done);
}
@java.lang.Override
public void pause(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done) {
impl.pause(controller, request, done);
}
@java.lang.Override
public void addr(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto> done) {
impl.addr(controller, request, done);
}
};
}
@ -102,6 +134,10 @@ public final class TestRpcServiceProtos {
return impl.echo(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto)request);
case 2:
return impl.error(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto)request);
case 3:
return impl.pause(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto)request);
case 4:
return impl.addr(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto)request);
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -122,6 +158,10 @@ public final class TestRpcServiceProtos {
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto.getDefaultInstance();
case 2:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance();
case 3:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.getDefaultInstance();
case 4:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -142,6 +182,10 @@ public final class TestRpcServiceProtos {
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto.getDefaultInstance();
case 2:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance();
case 3:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance();
case 4:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -174,6 +218,22 @@ public final class TestRpcServiceProtos {
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done);
/**
* <code>rpc pause(.PauseRequestProto) returns (.EmptyResponseProto);</code>
*/
public abstract void pause(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done);
/**
* <code>rpc addr(.EmptyRequestProto) returns (.AddrResponseProto);</code>
*/
public abstract void addr(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto> done);
public static final
com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptor() {
@ -211,6 +271,16 @@ public final class TestRpcServiceProtos {
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto>specializeCallback(
done));
return;
case 3:
this.pause(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto)request,
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto>specializeCallback(
done));
return;
case 4:
this.addr(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto)request,
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto>specializeCallback(
done));
return;
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -231,6 +301,10 @@ public final class TestRpcServiceProtos {
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto.getDefaultInstance();
case 2:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance();
case 3:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.getDefaultInstance();
case 4:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -251,6 +325,10 @@ public final class TestRpcServiceProtos {
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto.getDefaultInstance();
case 2:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance();
case 3:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance();
case 4:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
@ -316,6 +394,36 @@ public final class TestRpcServiceProtos {
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.class,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance()));
}
public void pause(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done) {
channel.callMethod(
getDescriptor().getMethods().get(3),
controller,
request,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance(),
com.google.protobuf.RpcUtil.generalizeCallback(
done,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.class,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance()));
}
public void addr(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto> done) {
channel.callMethod(
getDescriptor().getMethods().get(4),
controller,
request,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance(),
com.google.protobuf.RpcUtil.generalizeCallback(
done,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.class,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance()));
}
}
public static BlockingInterface newBlockingStub(
@ -338,6 +446,16 @@ public final class TestRpcServiceProtos {
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request)
throws com.google.protobuf.ServiceException;
public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto pause(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request)
throws com.google.protobuf.ServiceException;
public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto addr(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request)
throws com.google.protobuf.ServiceException;
}
private static final class BlockingStub implements BlockingInterface {
@ -382,6 +500,30 @@ public final class TestRpcServiceProtos {
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance());
}
public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto pause(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request)
throws com.google.protobuf.ServiceException {
return (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto) channel.callBlockingMethod(
getDescriptor().getMethods().get(3),
controller,
request,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance());
}
public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto addr(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request)
throws com.google.protobuf.ServiceException {
return (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto) channel.callBlockingMethod(
getDescriptor().getMethods().get(4),
controller,
request,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance());
}
}
// @@protoc_insertion_point(class_scope:TestProtobufRpcProto)
@ -396,14 +538,16 @@ public final class TestRpcServiceProtos {
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\026test_rpc_service.proto\032\ntest.proto2\250\001\n" +
"\n\026test_rpc_service.proto\032\ntest.proto2\212\002\n" +
"\024TestProtobufRpcProto\022/\n\004ping\022\022.EmptyReq" +
"uestProto\032\023.EmptyResponseProto\022-\n\004echo\022\021" +
".EchoRequestProto\032\022.EchoResponseProto\0220\n" +
"\005error\022\022.EmptyRequestProto\032\023.EmptyRespon" +
"seProtoBL\n.org.apache.hadoop.hbase.ipc.p" +
"rotobuf.generatedB\024TestRpcServiceProtos\210" +
"\001\001\240\001\001"
"seProto\0220\n\005pause\022\022.PauseRequestProto\032\023.E" +
"mptyResponseProto\022.\n\004addr\022\022.EmptyRequest" +
"Proto\032\022.AddrResponseProtoBL\n.org.apache." +
"hadoop.hbase.ipc.protobuf.generatedB\024Tes" +
"tRpcServiceProtos\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View File

@ -33,3 +33,11 @@ message EchoRequestProto {
message EchoResponseProto {
required string message = 1;
}
message PauseRequestProto {
required uint32 ms = 1;
}
message AddrResponseProto {
required string addr = 1;
}

View File

@ -30,4 +30,6 @@ service TestProtobufRpcProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo(EchoRequestProto) returns (EchoResponseProto);
rpc error(EmptyRequestProto) returns (EmptyResponseProto);
rpc pause(PauseRequestProto) returns (EmptyResponseProto);
rpc addr(EmptyRequestProto) returns (AddrResponseProto);
}

View File

@ -23,19 +23,24 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.util.Threads;
import java.io.IOException;
/**
* Test implementation of a coprocessor endpoint exposing the
* {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by
* unit tests only.
* {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by unit tests
* only.
*/
public class ProtobufCoprocessorService
extends TestRpcServiceProtos.TestProtobufRpcProto
public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto
implements CoprocessorService, Coprocessor {
public ProtobufCoprocessorService() {
}
@ -65,6 +70,20 @@ public class ProtobufCoprocessorService
done.run(null);
}
@Override
public void pause(RpcController controller, PauseRequestProto request,
RpcCallback<EmptyResponseProto> done) {
Threads.sleepWithoutInterrupt(request.getMs());
done.run(EmptyResponseProto.getDefaultInstance());
}
@Override
public void addr(RpcController controller, EmptyRequestProto request,
RpcCallback<AddrResponseProto> done) {
done.run(AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress())
.build());
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
// To change body of implemented methods use File | Settings | File Templates.
@ -74,4 +93,5 @@ public class ProtobufCoprocessorService
public void stop(CoprocessorEnvironment env) throws IOException {
// To change body of implemented methods use File | Settings | File Templates.
}
}

View File

@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
@ -27,15 +31,9 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@ -46,25 +44,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
/**
@ -74,61 +64,11 @@ public abstract class AbstractTestIPC {
private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
static byte[] BIG_CELL_BYTES = new byte[10 * 1024];
static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
static final Configuration CONF = HBaseConfiguration.create();
// We are using the test TestRpcServiceProtos generated classes and Service because they are
// available and basic with methods like 'echo', and ping. Below we make a blocking service
// by passing in implementation of blocking interface. We use this service in all tests that
// follow.
static final BlockingService SERVICE =
TestRpcServiceProtos.TestProtobufRpcProto
.newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
@Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
throws ServiceException {
if (controller instanceof PayloadCarryingRpcController) {
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
// If cells, scan them to check we are able to iterate what we were given and since
// this is
// an echo, just put them back on the controller creating a new block. Tests our
// block
// building.
CellScanner cellScanner = pcrc.cellScanner();
List<Cell> list = null;
if (cellScanner != null) {
list = new ArrayList<>();
try {
while (cellScanner.advance()) {
list.add(cellScanner.current());
}
} catch (IOException e) {
throw new ServiceException(e);
}
}
cellScanner = CellUtil.createCellScanner(list);
((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
}
return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
}
});
/**
* Instance of server. We actually don't do anything speical in here so could just use
@ -145,16 +85,9 @@ public abstract class AbstractTestIPC {
}
TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
super(null, "testRpcServer", Lists
.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
"localhost", 0), conf, scheduler);
}
@Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
return super.call(service, md, param, cellScanner, receiveTime, status);
super(null, "testRpcServer",
Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), conf, scheduler);
}
}
@ -162,28 +95,19 @@ public abstract class AbstractTestIPC {
/**
* Ensure we do not HAVE TO HAVE a codec.
* @throws InterruptedException
* @throws IOException
*/
@Test
public void testNoCodec() throws InterruptedException, IOException {
public void testNoCodec() throws IOException, ServiceException {
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
try (AbstractRpcClient client = createRpcClientNoCodec(conf)) {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
Pair<Message, CellScanner> r =
client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
assertTrue(r.getSecond() == null);
// Silly assertion that the message is in the returned pb.
assertTrue(r.getFirst().toString().contains(message));
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController();
String message = "hello";
assertEquals(message,
stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
assertNull(pcrc.cellScanner());
} finally {
rpcServer.stop();
}
@ -195,14 +119,9 @@ public abstract class AbstractTestIPC {
* It is hard to verify the compression is actually happening under the wraps. Hope that if
* unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
* confirm that compression is happening down in the client and server).
* @throws IOException
* @throws InterruptedException
* @throws SecurityException
* @throws NoSuchMethodException
*/
@Test
public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException,
NoSuchMethodException, ServiceException {
public void testCompressCellBlock() throws IOException, ServiceException {
Configuration conf = new Configuration(HBaseConfiguration.create());
conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
List<Cell> cells = new ArrayList<>();
@ -213,20 +132,17 @@ public abstract class AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
try (AbstractRpcClient client = createRpcClient(conf)) {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
Pair<Message, CellScanner> r =
client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(
CellUtil.createCellScanner(cells));
String message = "hello";
assertEquals(message,
stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
int index = 0;
while (r.getSecond().advance()) {
assertTrue(CELL.equals(r.getSecond().current()));
CellScanner cellScanner = pcrc.cellScanner();
assertNotNull(cellScanner);
while (cellScanner.advance()) {
assertEquals(CELL, cellScanner.current());
index++;
}
assertEquals(count, index);
@ -244,14 +160,8 @@ public abstract class AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
try (AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf)) {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
client.call(null, md, param, null, User.getCurrent(), address,
new MetricsConnection.CallStats());
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.ping(null, EmptyRequestProto.getDefaultInstance());
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString());
@ -261,27 +171,21 @@ public abstract class AbstractTestIPC {
}
}
/** Tests that the rpc scheduler is called when requests arrive. */
/**
* Tests that the rpc scheduler is called when requests arrive.
*/
@Test
public void testRpcScheduler() throws IOException, InterruptedException {
public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
RpcServer rpcServer = new TestRpcServer(scheduler, CONF);
verify(scheduler).init((RpcScheduler.Context) anyObject());
AbstractRpcClient client = createRpcClient(CONF);
try {
try (AbstractRpcClient client = createRpcClient(CONF)) {
rpcServer.start();
verify(scheduler).start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
for (int i = 0; i < 10; i++) {
client.call(new PayloadCarryingRpcController(
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
stub.echo(null, param);
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
} finally {
@ -292,101 +196,93 @@ public abstract class AbstractTestIPC {
/** Tests that the rpc scheduler is called when requests arrive. */
@Test
public void testRpcMaxRequestSize() throws IOException, InterruptedException {
public void testRpcMaxRequestSize() throws IOException, ServiceException {
Configuration conf = new Configuration(CONF);
conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100);
RpcServer rpcServer = new TestRpcServer(conf);
AbstractRpcClient client = createRpcClient(conf);
try {
try (AbstractRpcClient client = createRpcClient(conf)) {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
StringBuilder message = new StringBuilder(120);
for (int i = 0; i < 20; i++) {
message.append("hello.");
}
// set total RPC size bigger than 100 bytes
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello.hello.hello.hello."
+ "hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello").build();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
try {
client.call(new PayloadCarryingRpcController(
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build();
stub.echo(
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))),
param);
fail("RPC should have failed because it exceeds max request size");
} catch(IOException ex) {
// pass
}
} catch (ServiceException e) {
LOG.info("Caught expected exception: " + e.toString());
// the rpc server just close the connection so we can not get the detail message.
} finally {
rpcServer.stop();
}
}
/**
* Instance of RpcServer that echoes client hostAddress back to client
*/
static class TestRpcServer1 extends RpcServer {
private static final BlockingInterface SERVICE1 =
new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
@Override
public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request)
throws ServiceException {
return EmptyResponseProto.newBuilder().build();
}
@Override
public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
throws ServiceException {
final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress();
final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress();
return EchoResponseProto.newBuilder().setMessage(message).build();
}
@Override
public EmptyResponseProto error(RpcController unused, EmptyRequestProto request)
throws ServiceException {
throw new ServiceException("error", new IOException("error"));
}
};
TestRpcServer1() throws IOException {
this(new FifoRpcScheduler(CONF, 1));
}
TestRpcServer1(RpcScheduler scheduler) throws IOException {
super(null, "testRemoteAddressInCallObject", Lists
.newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto
.newReflectiveBlockingService(SERVICE1), null)),
new InetSocketAddress("localhost", 0), CONF, scheduler);
}
}
/**
* Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
* remoteAddress set to its Call Object
* @throws ServiceException
*/
@Test
public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException,
ServiceException {
final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler);
final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
try (AbstractRpcClient client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT,
localAddr, null)) {
public void testRpcServerForNotNullRemoteAddressInCallObject()
throws IOException, ServiceException {
TestRpcServer rpcServer = new TestRpcServer();
InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
try (AbstractRpcClient client = createRpcClient(CONF)) {
rpcServer.start();
final InetSocketAddress isa = rpcServer.getListenerAddress();
if (isa == null) {
throw new IOException("Listener channel is closed");
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
assertEquals(localAddr.getAddress().getHostAddress(),
stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr());
} finally {
rpcServer.stop();
}
}
@Test
public void testRemoteError() throws IOException, ServiceException {
TestRpcServer rpcServer = new TestRpcServer();
try (AbstractRpcClient client = createRpcClient(CONF)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.error(null, EmptyRequestProto.getDefaultInstance());
} catch (ServiceException e) {
LOG.info("Caught expected exception: " + e.getMessage());
IOException ioe = ProtobufUtil.handleRemoteException(e);
assertTrue(ioe instanceof DoNotRetryIOException);
assertTrue(ioe.getMessage().contains("server error!"));
} finally {
rpcServer.stop();
}
}
@Test
public void testTimeout() throws IOException {
TestRpcServer rpcServer = new TestRpcServer();
try (AbstractRpcClient client = createRpcClient(CONF)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController();
int ms = 1000;
int timeout = 100;
for (int i = 0; i < 10; i++) {
pcrc.reset();
pcrc.setCallTimeout(timeout);
long startTime = System.nanoTime();
try {
stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build());
} catch (ServiceException e) {
long waitTime = (System.nanoTime() - startTime) / 1000000;
// expected
LOG.info("Caught expected exception: " + e.getMessage());
IOException ioe = ProtobufUtil.handleRemoteException(e);
assertTrue(ioe.getCause() instanceof CallTimeoutException);
// confirm that we got exception before the actual pause.
assertTrue(waitTime < ms);
}
}
final BlockingRpcChannel channel = client.createBlockingRpcChannel(
ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0);
BlockingInterface stub = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
final EchoRequestProto echoRequest =
EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
final EchoResponseProto echoResponse = stub.echo(null, echoRequest);
Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage());
} finally {
rpcServer.stop();
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.MethodDescriptor;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
@ -27,33 +25,12 @@ import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.experimental.categories.Category;
@ -65,8 +42,6 @@ import org.junit.runners.Parameterized.Parameters;
@Category({ RPCTests.class, SmallTests.class })
public class TestAsyncIPC extends AbstractTestIPC {
private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
@Parameters
public static Collection<Object[]> parameters() {
List<Object[]> paramList = new ArrayList<>();
@ -92,8 +67,8 @@ public class TestAsyncIPC extends AbstractTestIPC {
if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) {
if (useNativeTransport
&& !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup)
|| (!useNativeTransport
&& !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) {
|| (!useNativeTransport && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP
.getFirst() instanceof NioEventLoopGroup))) {
AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully();
AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null;
}
@ -135,68 +110,4 @@ public class TestAsyncIPC extends AbstractTestIPC {
}
});
}
public static void main(String[] args) throws IOException, SecurityException,
NoSuchMethodException, InterruptedException {
if (args.length != 2) {
System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>");
return;
}
// ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
// ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
int cycles = Integer.parseInt(args[0]);
int cellcount = Integer.parseInt(args[1]);
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
KeyValue kv = BIG_CELL;
Put p = new Put(CellUtil.cloneRow(kv));
for (int i = 0; i < cellcount; i++) {
p.add(kv);
}
RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
rm.add(p);
try (AsyncRpcClient client = new AsyncRpcClient(conf)) {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
ClientProtos.RegionAction.Builder builder =
RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
MutationProto.newBuilder());
builder.setRegion(RegionSpecifier
.newBuilder()
.setType(RegionSpecifierType.REGION_NAME)
.setValue(
ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
if (i % 100000 == 0) {
LOG.info("" + i);
// Uncomment this for a thread dump every so often.
// ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
// "Thread dump " + Thread.currentThread().getName());
}
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
// Pair<Message, CellScanner> response =
client.call(pcrc, md, builder.build(), param, user, address,
new MetricsConnection.CallStats());
/*
* int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
* count);
*/
}
LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
+ (System.currentTimeMillis() - startTime) + "ms");
} finally {
rpcServer.stop();
}
}
}

View File

@ -22,37 +22,14 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.MethodDescriptor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.net.NetUtils;
@ -64,8 +41,6 @@ import org.mockito.stubbing.Answer;
@Category({ RPCTests.class, SmallTests.class })
public class TestIPC extends AbstractTestIPC {
private static final Log LOG = LogFactory.getLog(TestIPC.class);
@Override
protected RpcClientImpl createRpcClientNoCodec(Configuration conf) {
return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
@ -96,71 +71,4 @@ public class TestIPC extends AbstractTestIPC {
return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
}
public static void main(String[] args) throws IOException, SecurityException,
NoSuchMethodException, InterruptedException {
if (args.length != 2) {
System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>");
return;
}
// ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
// ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
int cycles = Integer.parseInt(args[0]);
int cellcount = Integer.parseInt(args[1]);
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
KeyValue kv = BIG_CELL;
Put p = new Put(CellUtil.cloneRow(kv));
for (int i = 0; i < cellcount; i++) {
p.add(kv);
}
RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
rm.add(p);
try {
rpcServer.start();
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
ClientProtos.RegionAction.Builder builder =
RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
MutationProto.newBuilder());
builder.setRegion(RegionSpecifier
.newBuilder()
.setType(RegionSpecifierType.REGION_NAME)
.setValue(
ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
if (i % 100000 == 0) {
LOG.info("" + i);
// Uncomment this for a thread dump every so often.
// ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
// "Thread dump " + Thread.currentThread().getName());
}
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
// Pair<Message, CellScanner> response =
client.call(pcrc, md, builder.build(), param, user, address,
new MetricsConnection.CallStats());
/*
* int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
* count);
*/
}
LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
+ (System.currentTimeMillis() - startTime) + "ms");
} finally {
client.close();
rpcServer.stop();
}
}
}

View File

@ -17,40 +17,37 @@
*/
package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.InetSocketAddress;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Test for testing protocol buffer based RPC mechanism.
* This test depends on test.proto definition of types in <code>src/test/protobuf/test.proto</code>
* and protobuf service definition from <code>src/test/protobuf/test_rpc_service.proto</code>
* Test for testing protocol buffer based RPC mechanism. This test depends on test.proto definition
* of types in <code>src/test/protobuf/test.proto</code> and protobuf service definition from
* <code>src/test/protobuf/test_rpc_service.proto</code>
*/
@Category({ RPCTests.class, MediumTests.class })
public class TestProtoBufRpc {
@ -60,31 +57,6 @@ public class TestProtoBufRpc {
private Configuration conf;
private RpcServerInterface server;
/**
* Implementation of the test service defined out in TestRpcServiceProtos
*/
static class PBServerImpl
implements TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface {
@Override
public EmptyResponseProto ping(RpcController unused,
EmptyRequestProto request) throws ServiceException {
return EmptyResponseProto.newBuilder().build();
}
@Override
public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
throws ServiceException {
return EchoResponseProto.newBuilder().setMessage(request.getMessage())
.build();
}
@Override
public EmptyResponseProto error(RpcController unused,
EmptyRequestProto request) throws ServiceException {
throw new ServiceException("error", new IOException("error"));
}
}
@Before
public void setUp() throws IOException { // Setup server for both protocols
this.conf = HBaseConfiguration.create();
@ -93,14 +65,10 @@ public class TestProtoBufRpc {
log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer.trace");
log.setLevel(Level.TRACE);
// Create server side implementation
PBServerImpl serverImpl = new PBServerImpl();
BlockingService service =
TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(serverImpl);
// Get RPC server for server side implementation
this.server = new RpcServer(null, "testrpc",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
new InetSocketAddress(ADDRESS, PORT), conf,
new FifoRpcScheduler(conf, 10));
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10));
InetSocketAddress address = server.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
@ -118,25 +86,20 @@ public class TestProtoBufRpc {
public void testProtoBufRpc() throws Exception {
RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT);
try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0);
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
BlockingInterface stub = newBlockingStub(rpcClient, this.isa);
// Test ping method
TestProtos.EmptyRequestProto emptyRequest =
TestProtos.EmptyRequestProto.newBuilder().build();
TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.newBuilder().build();
stub.ping(null, emptyRequest);
// Test echo method
EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("hello").build();
EchoResponseProto echoResponse = stub.echo(null, echoRequest);
Assert.assertEquals(echoResponse.getMessage(), "hello");
assertEquals(echoResponse.getMessage(), "hello");
// Test error method - error should be thrown as RemoteException
try {
stub.error(null, emptyRequest);
Assert.fail("Expected exception is not thrown");
fail("Expected exception is not thrown");
} catch (ServiceException e) {
}
} finally {

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Threads;
@InterfaceAudience.Private
public class TestProtobufRpcServiceImpl implements BlockingInterface {
public static final BlockingService SERVICE = TestProtobufRpcProto
.newReflectiveBlockingService(new TestProtobufRpcServiceImpl());
public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr)
throws IOException {
return newBlockingStub(client, addr, User.getCurrent());
}
public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr,
User user) throws IOException {
return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel(
ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), user, 0));
}
public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException {
return TestProtobufRpcProto.newStub(client.createProtobufRpcChannel(
ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0));
}
@Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return EmptyResponseProto.getDefaultInstance();
}
@Override
public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
throws ServiceException {
if (controller instanceof PayloadCarryingRpcController) {
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
// If cells, scan them to check we are able to iterate what we were given and since this is an
// echo, just put them back on the controller creating a new block. Tests our block building.
CellScanner cellScanner = pcrc.cellScanner();
List<Cell> list = null;
if (cellScanner != null) {
list = new ArrayList<>();
try {
while (cellScanner.advance()) {
list.add(cellScanner.current());
}
} catch (IOException e) {
throw new ServiceException(e);
}
}
cellScanner = CellUtil.createCellScanner(list);
((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
}
return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
}
@Override
public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
throws ServiceException {
throw new ServiceException(new DoNotRetryIOException("server error!"));
}
@Override
public EmptyResponseProto pause(RpcController controller, PauseRequestProto request)
throws ServiceException {
Threads.sleepWithoutInterrupt(request.getMs());
return EmptyResponseProto.getDefaultInstance();
}
@Override
public AddrResponseProto addr(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress())
.build();
}
}

View File

@ -17,108 +17,31 @@
*/
package org.apache.hadoop.hbase.ipc;
import com.google.common.collect.ImmutableList;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.mockito.Mockito.mock;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RPCTests.class, SmallTests.class })
public class TestRpcHandlerException {
private static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class);
static String example = "xyz";
static byte[] CELL_BYTES = example.getBytes();
static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
private final static Configuration CONF = HBaseConfiguration.create();
RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class);
// We are using the test TestRpcServiceProtos generated classes and Service because they are
// available and basic with methods like 'echo', and ping. Below we make a blocking service
// by passing in implementation of blocking interface. We use this service in all tests that
// follow.
private static final BlockingService SERVICE =
TestRpcServiceProtos.TestProtobufRpcProto
.newReflectiveBlockingService(new TestRpcServiceProtos
.TestProtobufRpcProto.BlockingInterface() {
@Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
throws Error, RuntimeException {
if (controller instanceof PayloadCarryingRpcController) {
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
// If cells, scan them to check we are able to iterate what we were given and since
// this is
// an echo, just put them back on the controller creating a new block. Tests our
// block
// building.
CellScanner cellScanner = pcrc.cellScanner();
List<Cell> list = null;
if (cellScanner != null) {
list = new ArrayList<Cell>();
try {
while (cellScanner.advance()) {
list.add(cellScanner.current());
throw new StackOverflowError();
}
} catch (StackOverflowError e) {
throw e;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
cellScanner = CellUtil.createCellScanner(list);
((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
}
return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
}
});
/**
* Instance of server. We actually don't do anything speical in here so could just use
@ -126,29 +49,18 @@ public class TestRpcHandlerException {
*/
private static class TestRpcServer extends RpcServer {
TestRpcServer() throws IOException {
this(new FifoRpcScheduler(CONF, 1));
}
TestRpcServer(RpcScheduler scheduler) throws IOException {
super(null, "testRpcServer",
Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, scheduler);
}
@Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
return super.call(service, md, param, cellScanner, receiveTime, status);
}
}
/** Tests that the rpc scheduler is called when requests arrive.
* When Rpc handler thread dies, the client will hang and the test will fail.
* The test is meant to be a unit test to test the behavior.
*
* */
/**
* Tests that the rpc scheduler is called when requests arrive. When Rpc handler thread dies, the
* client will hang and the test will fail. The test is meant to be a unit test to test the
* behavior.
*/
private class AbortServer implements Abortable {
private boolean aborted = false;
@ -163,7 +75,8 @@ public class TestRpcHandlerException {
}
}
/* This is a unit test to make sure to abort region server when the number of Rpc handler thread
/*
* This is a unit test to make sure to abort region server when the number of Rpc handler thread
* caught errors exceeds the threshold. Client will hang when RS aborts.
*/
@Ignore
@ -173,19 +86,10 @@ public class TestRpcHandlerException {
Abortable abortable = new AbortServer();
RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
RpcServer rpcServer = new TestRpcServer(scheduler);
RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
try {
try (RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT)) {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
PayloadCarryingRpcController controller =
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
address, new MetricsConnection.CallStats());
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
} catch (Throwable e) {
assert (abortable.isAborted() == true);
} finally {

View File

@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
@ -25,6 +27,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -32,27 +37,21 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import javax.security.sasl.SaslException;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -64,12 +63,6 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.BlockingService;
import javax.security.sasl.SaslException;
public abstract class AbstractTestSecureIPC {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -77,55 +70,6 @@ public abstract class AbstractTestSecureIPC {
private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
.getPath());
static final BlockingService SERVICE =
TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
@Override
public TestProtos.EmptyResponseProto ping(RpcController controller,
TestProtos.EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public TestProtos.EmptyResponseProto error(RpcController controller,
TestProtos.EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public TestProtos.EchoResponseProto echo(RpcController controller,
TestProtos.EchoRequestProto request)
throws ServiceException {
if (controller instanceof PayloadCarryingRpcController) {
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
// If cells, scan them to check we are able to iterate what we were given and since
// this is
// an echo, just put them back on the controller creating a new block. Tests our
// block
// building.
CellScanner cellScanner = pcrc.cellScanner();
List<Cell> list = null;
if (cellScanner != null) {
list = new ArrayList<Cell>();
try {
while (cellScanner.advance()) {
list.add(cellScanner.current());
}
} catch (IOException e) {
throw new ServiceException(e);
}
}
cellScanner = CellUtil.createCellScanner(list);
((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
}
return TestProtos.EchoResponseProto.newBuilder()
.setMessage(request.getMessage()).build();
}
});
private static MiniKdc KDC;
private static String HOST = "localhost";
private static String PRINCIPAL;
@ -262,16 +206,8 @@ public abstract class AbstractTestSecureIPC {
rpcServer.start();
try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
HConstants.DEFAULT_CLUSTER_ID.toString())) {
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
BlockingRpcChannel channel =
rpcClient.createBlockingRpcChannel(
ServerName.valueOf(address.getHostName(), address.getPort(),
System.currentTimeMillis()), clientUser, 0);
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress(),
clientUser);
List<String> results = new ArrayList<>();
TestThread th1 = new TestThread(stub, results);
final Throwable exception[] = new Throwable[1];
@ -298,11 +234,11 @@ public abstract class AbstractTestSecureIPC {
}
public static class TestThread extends Thread {
private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
private final BlockingInterface stub;
private final List<String> results;
public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results) {
public TestThread(BlockingInterface stub, List<String> results) {
this.stub = stub;
this.results = results;
}