HBASE-17262 Refactor RpcServer so as to make it extendable and/or pluggable
This commit is contained in:
parent
d787155fd2
commit
fc93de51af
|
@ -25,11 +25,6 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
|
@ -45,20 +40,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@Category(IntegrationTests.class)
|
||||
public class IntegrationTestRpcClient {
|
||||
|
||||
|
@ -72,26 +67,6 @@ public class IntegrationTestRpcClient {
|
|||
conf = HBaseConfiguration.create();
|
||||
}
|
||||
|
||||
static class TestRpcServer extends RpcServer {
|
||||
|
||||
TestRpcServer(Configuration conf) throws IOException {
|
||||
this(new FifoRpcScheduler(conf, 1), conf);
|
||||
}
|
||||
|
||||
TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
|
||||
super(null, "testRpcServer", Lists
|
||||
.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
|
||||
"localhost", 0), conf, scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
|
||||
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
|
||||
throws IOException {
|
||||
return super.call(service, md, param, cellScanner, receiveTime, status);
|
||||
}
|
||||
}
|
||||
|
||||
protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
|
||||
return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
|
||||
@Override
|
||||
|
@ -116,8 +91,8 @@ public class IntegrationTestRpcClient {
|
|||
class Cluster {
|
||||
Random random = new Random();
|
||||
ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>();
|
||||
List<TestRpcServer> serverList = new ArrayList<>();
|
||||
HashMap<InetSocketAddress, RpcServer> rpcServers = new HashMap<>();
|
||||
List<RpcServer> serverList = new ArrayList<>();
|
||||
int maxServers;
|
||||
int minServers;
|
||||
|
||||
|
@ -126,14 +101,18 @@ public class IntegrationTestRpcClient {
|
|||
this.maxServers = maxServers;
|
||||
}
|
||||
|
||||
TestRpcServer startServer() throws IOException {
|
||||
RpcServer startServer() throws IOException {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
if (rpcServers.size() >= maxServers) {
|
||||
return null;
|
||||
}
|
||||
|
||||
TestRpcServer rpcServer = new TestRpcServer(conf);
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists
|
||||
.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(
|
||||
conf, 1));
|
||||
rpcServer.start();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
|
@ -150,7 +129,7 @@ public class IntegrationTestRpcClient {
|
|||
|
||||
void stopRandomServer() throws Exception {
|
||||
lock.writeLock().lock();
|
||||
TestRpcServer rpcServer = null;
|
||||
RpcServer rpcServer = null;
|
||||
try {
|
||||
if (rpcServers.size() <= minServers) {
|
||||
return;
|
||||
|
@ -174,7 +153,7 @@ public class IntegrationTestRpcClient {
|
|||
}
|
||||
}
|
||||
|
||||
void stopServer(TestRpcServer rpcServer) throws InterruptedException {
|
||||
void stopServer(RpcServer rpcServer) throws InterruptedException {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
LOG.info("Stopping server: " + address);
|
||||
rpcServer.stop();
|
||||
|
@ -185,7 +164,7 @@ public class IntegrationTestRpcClient {
|
|||
void stopRunning() throws InterruptedException {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
for (TestRpcServer rpcServer : serverList) {
|
||||
for (RpcServer rpcServer : serverList) {
|
||||
stopServer(rpcServer);
|
||||
}
|
||||
|
||||
|
@ -194,7 +173,7 @@ public class IntegrationTestRpcClient {
|
|||
}
|
||||
}
|
||||
|
||||
TestRpcServer getRandomServer() {
|
||||
RpcServer getRandomServer() {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
int size = rpcServers.size();
|
||||
|
@ -278,7 +257,7 @@ public class IntegrationTestRpcClient {
|
|||
String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
|
||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
|
||||
EchoResponseProto ret;
|
||||
TestRpcServer server = cluster.getRandomServer();
|
||||
RpcServer server = cluster.getRandomServer();
|
||||
try {
|
||||
sending.set(true);
|
||||
BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress());
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
package org.apache.hadoop.hbase.ipc;
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -16,6 +15,8 @@ package org.apache.hadoop.hbase.ipc;
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class RpcServerFactory {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(RpcServerFactory.class);
|
||||
|
||||
public static final String CUSTOM_RPC_SERVER_IMPL_CONF_KEY = "hbase.rpc.server.impl";
|
||||
|
||||
/**
|
||||
* Private Constructor
|
||||
*/
|
||||
private RpcServerFactory() {
|
||||
}
|
||||
|
||||
public static RpcServer createRpcServer(final Server server, final String name,
|
||||
final List<BlockingServiceAndInterface> services,
|
||||
final InetSocketAddress bindAddress, Configuration conf,
|
||||
RpcScheduler scheduler) throws IOException {
|
||||
String rpcServerClass = conf.get(CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
|
||||
SimpleRpcServer.class.getName());
|
||||
LOG.info("Use " + rpcServerClass + " rpc server");
|
||||
return ReflectionUtils.instantiateWithCustomCtor(rpcServerClass,
|
||||
new Class[] { Server.class, String.class, List.class,
|
||||
InetSocketAddress.class, Configuration.class, RpcScheduler.class },
|
||||
new Object[] { server, name, services, bindAddress, conf, scheduler });
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.ipc.RpcCallContext;
|
|||
import org.apache.hadoop.hbase.ipc.RpcCallback;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
|
@ -1060,7 +1061,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// Set how many times to retry talking to another server over Connection.
|
||||
ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
|
||||
try {
|
||||
rpcServer = new RpcServer(rs, name, getServices(),
|
||||
rpcServer = RpcServerFactory.createRpcServer(rs, name, getServices(),
|
||||
bindAddress, // use final bindAddress for this server.
|
||||
rs.conf,
|
||||
rpcSchedulerFactory.create(rs.conf, this, rs));
|
||||
|
|
|
@ -46,7 +46,9 @@ import org.apache.hadoop.hbase.CellUtil;
|
|||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
|
||||
|
@ -55,7 +57,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseReq
|
|||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -77,27 +78,6 @@ public abstract class AbstractTestIPC {
|
|||
static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
|
||||
static final Configuration CONF = HBaseConfiguration.create();
|
||||
|
||||
/**
|
||||
* Instance of server. We actually don't do anything speical in here so could just use
|
||||
* HBaseRpcServer directly.
|
||||
*/
|
||||
static class TestRpcServer extends RpcServer {
|
||||
|
||||
TestRpcServer() throws IOException {
|
||||
this(new FifoRpcScheduler(CONF, 1), CONF);
|
||||
}
|
||||
|
||||
TestRpcServer(Configuration conf) throws IOException {
|
||||
this(new FifoRpcScheduler(conf, 1), conf);
|
||||
}
|
||||
|
||||
TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
|
||||
super(null, "testRpcServer",
|
||||
Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), conf, scheduler);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
|
||||
|
||||
/**
|
||||
|
@ -106,7 +86,10 @@ public abstract class AbstractTestIPC {
|
|||
@Test
|
||||
public void testNoCodec() throws IOException, ServiceException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
|
@ -136,7 +119,10 @@ public abstract class AbstractTestIPC {
|
|||
for (int i = 0; i < count; i++) {
|
||||
cells.add(CELL);
|
||||
}
|
||||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
|
@ -163,7 +149,10 @@ public abstract class AbstractTestIPC {
|
|||
@Test
|
||||
public void testRTEDuringConnectionSetup() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
|
@ -183,7 +172,10 @@ public abstract class AbstractTestIPC {
|
|||
@Test
|
||||
public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
|
||||
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
|
||||
RpcServer rpcServer = new TestRpcServer(scheduler, CONF);
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer",
|
||||
Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, scheduler);
|
||||
verify(scheduler).init((RpcScheduler.Context) anyObject());
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
rpcServer.start();
|
||||
|
@ -205,7 +197,10 @@ public abstract class AbstractTestIPC {
|
|||
public void testRpcMaxRequestSize() throws IOException, ServiceException {
|
||||
Configuration conf = new Configuration(CONF);
|
||||
conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
|
||||
RpcServer rpcServer = new TestRpcServer(conf);
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), conf,
|
||||
new FifoRpcScheduler(conf, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
|
@ -236,7 +231,10 @@ public abstract class AbstractTestIPC {
|
|||
@Test
|
||||
public void testRpcServerForNotNullRemoteAddressInCallObject()
|
||||
throws IOException, ServiceException {
|
||||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
rpcServer.start();
|
||||
|
@ -250,7 +248,10 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testRemoteError() throws IOException, ServiceException {
|
||||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
|
@ -267,7 +268,10 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testTimeout() throws IOException {
|
||||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
|
@ -295,7 +299,7 @@ public abstract class AbstractTestIPC {
|
|||
}
|
||||
}
|
||||
|
||||
static class TestFailingRpcServer extends TestRpcServer {
|
||||
static class TestFailingRpcServer extends SimpleRpcServer {
|
||||
|
||||
TestFailingRpcServer() throws IOException {
|
||||
this(new FifoRpcScheduler(CONF, 1), CONF);
|
||||
|
@ -306,7 +310,9 @@ public abstract class AbstractTestIPC {
|
|||
}
|
||||
|
||||
TestFailingRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
|
||||
super(scheduler, conf);
|
||||
super(null, "testRpcServer", Lists
|
||||
.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), conf, scheduler);
|
||||
}
|
||||
|
||||
class FailingConnection extends Connection {
|
||||
|
@ -349,7 +355,10 @@ public abstract class AbstractTestIPC {
|
|||
@Test
|
||||
public void testAsyncEcho() throws IOException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
||||
rpcServer.start();
|
||||
Interface stub = newStub(client, rpcServer.getListenerAddress());
|
||||
|
@ -377,7 +386,10 @@ public abstract class AbstractTestIPC {
|
|||
@Test
|
||||
public void testAsyncRemoteError() throws IOException {
|
||||
AbstractRpcClient<?> client = createRpcClient(CONF);
|
||||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
try {
|
||||
rpcServer.start();
|
||||
Interface stub = newStub(client, rpcServer.getListenerAddress());
|
||||
|
@ -398,7 +410,10 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testAsyncTimeout() throws IOException {
|
||||
TestRpcServer rpcServer = new TestRpcServer();
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
|
||||
"testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
rpcServer.start();
|
||||
Interface stub = newStub(client, rpcServer.getListenerAddress());
|
||||
|
|
|
@ -65,7 +65,7 @@ public class TestProtoBufRpc {
|
|||
log.setLevel(Level.TRACE);
|
||||
// Create server side implementation
|
||||
// Get RPC server for server side implementation
|
||||
this.server = new RpcServer(null, "testrpc",
|
||||
this.server = RpcServerFactory.createRpcServer(null, "testrpc",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10));
|
||||
InetSocketAddress address = server.getListenerAddress();
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.net.InetSocketAddress;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||
|
@ -36,6 +37,7 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
|
||||
|
||||
@Category({ RPCTests.class, SmallTests.class })
|
||||
|
@ -43,19 +45,6 @@ public class TestRpcHandlerException {
|
|||
|
||||
private final static Configuration CONF = HBaseConfiguration.create();
|
||||
|
||||
/**
|
||||
* Instance of server. We actually don't do anything speical in here so could just use
|
||||
* HBaseRpcServer directly.
|
||||
*/
|
||||
private static class TestRpcServer extends RpcServer {
|
||||
|
||||
TestRpcServer(RpcScheduler scheduler) throws IOException {
|
||||
super(null, "testRpcServer",
|
||||
Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, scheduler);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that the rpc scheduler is called when requests arrive. When Rpc handler thread dies, the
|
||||
* client will hang and the test will fail. The test is meant to be a unit test to test the
|
||||
|
@ -85,7 +74,9 @@ public class TestRpcHandlerException {
|
|||
PriorityFunction qosFunction = mock(PriorityFunction.class);
|
||||
Abortable abortable = new AbortServer();
|
||||
RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
|
||||
RpcServer rpcServer = new TestRpcServer(scheduler);
|
||||
RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer",
|
||||
Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, scheduler);
|
||||
try (BlockingRpcClient client = new BlockingRpcClient(CONF)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.ipc.RpcClient;
|
|||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||
|
@ -250,7 +251,7 @@ public class TestSecureIPC {
|
|||
|
||||
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
|
||||
|
||||
RpcServerInterface rpcServer = new RpcServer(null, "AbstractTestSecureIPC",
|
||||
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)), isa,
|
||||
serverConf, new FifoRpcScheduler(serverConf, 1));
|
||||
rpcServer.start();
|
||||
|
|
|
@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -40,7 +39,6 @@ import org.apache.hadoop.hbase.ClusterId;
|
|||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -51,14 +49,12 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.WhoAmIResponse;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.security.SecurityInfo;
|
||||
|
@ -78,7 +74,6 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
|||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.net.DNS;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.authorize.Service;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
|
@ -188,8 +183,8 @@ public class TestTokenAuthentication {
|
|||
};
|
||||
sai.add(new BlockingServiceAndInterface(proxy,
|
||||
AuthenticationProtos.AuthenticationService.BlockingInterface.class));
|
||||
this.rpcServer =
|
||||
new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
|
||||
this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai,
|
||||
initialIsa, conf, new FifoRpcScheduler(conf, 1));
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
|
|
Loading…
Reference in New Issue