diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java new file mode 100644 index 00000000000..d6007128579 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.VersionInfoUtil; +import org.apache.hadoop.hbase.exceptions.RequestTooBigException; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFutureListener; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext; +import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.CorruptedFrameException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; + + +/** + * Decoder for extracting frame + * + * @since 2.0.0 + */ +@InterfaceAudience.Private +public class NettyRpcFrameDecoder extends ByteToMessageDecoder { + + private static int FRAME_LENGTH_FIELD_LENGTH = 4; + + private final int maxFrameLength; + private boolean requestTooBig; + private String requestTooBigMessage; + + public NettyRpcFrameDecoder(int maxFrameLength) { + this.maxFrameLength = maxFrameLength; + } + + private NettyServerRpcConnection connection; + + void setConnection(NettyServerRpcConnection connection) { + this.connection = connection; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) + throws Exception { + if (requestTooBig) { + handleTooBigRequest(in); + return; + } + + if (in.readableBytes() < FRAME_LENGTH_FIELD_LENGTH) { + return; + } + + long frameLength = in.getUnsignedInt(in.readerIndex()); + + if (frameLength < 0) { + throw new IOException("negative frame length field: " + frameLength); + } + + if (frameLength > maxFrameLength) { + requestTooBig = true; + requestTooBigMessage = + "RPC data length of " + frameLength + " received from " + connection.getHostAddress() + + " is greater than max allowed " + connection.rpcServer.maxRequestSize + ". Set \"" + + SimpleRpcServer.MAX_REQUEST_SIZE + + "\" on server to override this limit (not recommended)"; + + NettyRpcServer.LOG.warn(requestTooBigMessage); + + if (connection.connectionHeaderRead) { + handleTooBigRequest(in); + return; + } + ctx.channel().close(); + return; + } + + int frameLengthInt = (int) frameLength; + if (in.readableBytes() < frameLengthInt) { + return; + } + + in.skipBytes(FRAME_LENGTH_FIELD_LENGTH); + + // extract frame + int readerIndex = in.readerIndex(); + ByteBuf frame = in.retainedSlice(readerIndex, frameLengthInt); + in.readerIndex(readerIndex + frameLengthInt); + + out.add(frame); + } + + private void handleTooBigRequest(ByteBuf in) throws IOException { + in.markReaderIndex(); + int preIndex = in.readerIndex(); + int headerSize = readRawVarint32(in); + if (preIndex == in.readerIndex()) { + return; + } + if (headerSize < 0) { + throw new IOException("negative headerSize: " + headerSize); + } + + if (in.readableBytes() < headerSize) { + in.resetReaderIndex(); + return; + } + + RPCProtos.RequestHeader header = getHeader(in, headerSize); + + // Notify the client about the offending request + NettyServerCall reqTooBig = + new NettyServerCall(header.getCallId(), connection.service, null, null, null, null, + connection, 0, connection.addr, System.currentTimeMillis(), 0, + connection.rpcServer.reservoir, connection.rpcServer.cellBlockBuilder, null); + + connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); + + // Make sure the client recognizes the underlying exception + // Otherwise, throw a DoNotRetryIOException. + if (VersionInfoUtil.hasMinimumVersion(connection.connectionHeader.getVersionInfo(), + RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) { + reqTooBig.setResponse(null, null, + SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, requestTooBigMessage); + } else { + reqTooBig.setResponse(null, null, new DoNotRetryIOException(), requestTooBigMessage); + } + + // To guarantee that the message is written and flushed before closing the channel, + // we should call channel.writeAndFlush() directly to add the close listener + // instead of calling reqTooBig.sendResponseIfReady() + reqTooBig.param = null; + connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE); + } + + private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException { + ByteBuf msg = in.readRetainedSlice(headerSize); + try { + byte[] array; + int offset; + int length = msg.readableBytes(); + if (msg.hasArray()) { + array = msg.array(); + offset = msg.arrayOffset() + msg.readerIndex(); + } else { + array = new byte[length]; + msg.getBytes(msg.readerIndex(), array, 0, length); + offset = 0; + } + + RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder(); + ProtobufUtil.mergeFrom(builder, array, offset, length); + return builder.build(); + } finally { + msg.release(); + } + } + + /** + * Reads variable length 32bit int from buffer + * This method is from ProtobufVarint32FrameDecoder in Netty and modified a little bit + * to pass the cyeckstyle rule. + * + * @return decoded int if buffers readerIndex has been forwarded else nonsense value + */ + private static int readRawVarint32(ByteBuf buffer) { + if (!buffer.isReadable()) { + return 0; + } + buffer.markReaderIndex(); + byte tmp = buffer.readByte(); + if (tmp >= 0) { + return tmp; + } else { + int result = tmp & 127; + if (!buffer.isReadable()) { + buffer.resetReaderIndex(); + return 0; + } + tmp = buffer.readByte(); + if (tmp >= 0) { + result |= tmp << 7; + } else { + result |= (tmp & 127) << 7; + if (!buffer.isReadable()) { + buffer.resetReaderIndex(); + return 0; + } + tmp = buffer.readByte(); + if (tmp >= 0) { + result |= tmp << 14; + } else { + result |= (tmp & 127) << 14; + if (!buffer.isReadable()) { + buffer.resetReaderIndex(); + return 0; + } + tmp = buffer.readByte(); + if (tmp >= 0) { + result |= tmp << 21; + } else { + result |= (tmp & 127) << 21; + if (!buffer.isReadable()) { + buffer.resetReaderIndex(); + return 0; + } + tmp = buffer.readByte(); + result |= tmp << 28; + if (tmp < 0) { + throw new CorruptedFrameException("malformed varint."); + } + } + } + } + return result; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index f86fa774bac..4282a8faa49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.shaded.io.netty.channel.group.DefaultChannelGroup import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.FixedLengthFrameDecoder; -import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.DefaultThreadFactory; import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.GlobalEventExecutor; @@ -48,6 +47,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.HBasePolicyProvider; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; @@ -97,10 +97,8 @@ public class NettyRpcServer extends RpcServer { FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); preambleDecoder.setSingleDecode(true); pipeline.addLast("preambleDecoder", preambleDecoder); - pipeline.addLast("preambleHandler", - new NettyRpcServerPreambleHandler(NettyRpcServer.this)); - pipeline.addLast("frameDecoder", - new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true)); + pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler()); + pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize)); pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); } @@ -115,6 +113,11 @@ public class NettyRpcServer extends RpcServer { this.scheduler.init(new RpcSchedulerContext(this)); } + @VisibleForTesting + protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { + return new NettyRpcServerPreambleHandler(NettyRpcServer.this); + } + @Override public synchronized void start() { if (started) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java index 333426da8ad..6823e0ada46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf; +import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext; import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelPipeline; import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler; @@ -25,6 +26,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandl import java.nio.ByteBuffer; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** * Handle connection preamble. @@ -41,7 +43,7 @@ class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - NettyServerRpcConnection conn = new NettyServerRpcConnection(rpcServer, ctx.channel()); + NettyServerRpcConnection conn = createNettyServerRpcConnection(ctx.channel()); ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes()); msg.readBytes(buf); buf.flip(); @@ -50,9 +52,14 @@ class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler return; } ChannelPipeline p = ctx.pipeline(); + ((NettyRpcFrameDecoder) p.get("frameDecoder")).setConnection(conn); ((NettyRpcServerRequestDecoder) p.get("decoder")).setConnection(conn); p.remove(this); p.remove("preambleDecoder"); } + @VisibleForTesting + protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { + return new NettyServerRpcConnection(rpcServer, channel); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 4a4272f74e5..6befe8fee5f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -33,7 +33,6 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; @@ -46,8 +45,8 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; -import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; @@ -74,9 +73,13 @@ public abstract class AbstractTestIPC { private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); - static byte[] BIG_CELL_BYTES = new byte[10 * 1024]; - static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES); - static final Configuration CONF = HBaseConfiguration.create(); + + protected static final Configuration CONF = HBaseConfiguration.create(); + + protected abstract RpcServer createRpcServer(final Server server, final String name, + final List services, + final InetSocketAddress bindAddress, Configuration conf, + RpcScheduler scheduler) throws IOException; protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf); @@ -86,8 +89,8 @@ public abstract class AbstractTestIPC { @Test public void testNoCodec() throws IOException, ServiceException { Configuration conf = HBaseConfiguration.create(); - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient client = createRpcClientNoCodec(conf)) { @@ -119,10 +122,11 @@ public abstract class AbstractTestIPC { for (int i = 0; i < count; i++) { cells.add(CELL); } - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); + try (AbstractRpcClient client = createRpcClient(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); @@ -149,8 +153,8 @@ public abstract class AbstractTestIPC { @Test public void testRTEDuringConnectionSetup() throws Exception { Configuration conf = HBaseConfiguration.create(); - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf)) { @@ -172,10 +176,9 @@ public abstract class AbstractTestIPC { @Test public void testRpcScheduler() throws IOException, ServiceException, InterruptedException { RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", - Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("localhost", 0), CONF, scheduler); + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, scheduler); verify(scheduler).init((RpcScheduler.Context) anyObject()); try (AbstractRpcClient client = createRpcClient(CONF)) { rpcServer.start(); @@ -197,8 +200,8 @@ public abstract class AbstractTestIPC { public void testRpcMaxRequestSize() throws IOException, ServiceException { Configuration conf = new Configuration(CONF); conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000); - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1)); try (AbstractRpcClient client = createRpcClient(conf)) { @@ -231,8 +234,8 @@ public abstract class AbstractTestIPC { @Test public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException, ServiceException { - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); @@ -248,8 +251,8 @@ public abstract class AbstractTestIPC { @Test public void testRemoteError() throws IOException, ServiceException { - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient client = createRpcClient(CONF)) { @@ -268,8 +271,8 @@ public abstract class AbstractTestIPC { @Test public void testTimeout() throws IOException { - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient client = createRpcClient(CONF)) { @@ -299,47 +302,20 @@ public abstract class AbstractTestIPC { } } - static class TestFailingRpcServer extends SimpleRpcServer { - - TestFailingRpcServer() throws IOException { - this(new FifoRpcScheduler(CONF, 1), CONF); - } - - TestFailingRpcServer(Configuration conf) throws IOException { - this(new FifoRpcScheduler(conf, 1), conf); - } - - TestFailingRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { - super(null, "testRpcServer", Lists - .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("localhost", 0), conf, scheduler); - } - - class FailingConnection extends SimpleServerRpcConnection { - public FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel, - long lastContact) { - super(rpcServer, channel, lastContact); - } - - @Override - public void processRequest(ByteBuff buf) throws IOException, InterruptedException { - // this will throw exception after the connection header is read, and an RPC is sent - // from client - throw new DoNotRetryIOException("Failing for test"); - } - } - - @Override - protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { - return new FailingConnection(this, channel, time); - } - } + protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name, + final List services, + final InetSocketAddress bindAddress, Configuration conf, + RpcScheduler scheduler) throws IOException; /** Tests that the connection closing is handled by the client with outstanding RPC calls */ @Test public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { Configuration conf = new Configuration(CONF); - RpcServer rpcServer = new TestFailingRpcServer(conf); + RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); + try (AbstractRpcClient client = createRpcClient(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); @@ -356,8 +332,8 @@ public abstract class AbstractTestIPC { @Test public void testAsyncEcho() throws IOException { Configuration conf = HBaseConfiguration.create(); - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient client = createRpcClient(conf)) { @@ -387,8 +363,8 @@ public abstract class AbstractTestIPC { @Test public void testAsyncRemoteError() throws IOException { AbstractRpcClient client = createRpcClient(CONF); - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); try { @@ -411,8 +387,8 @@ public abstract class AbstractTestIPC { @Test public void testAsyncTimeout() throws IOException { - RpcServer rpcServer = RpcServerFactory.createRpcServer(null, - "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface( + RpcServer rpcServer = createRpcServer(null, "testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient client = createRpcClient(CONF)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java index 98efcfbebb2..9c91d1d39fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java @@ -19,9 +19,15 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.experimental.categories.Category; @@ -29,6 +35,12 @@ import org.junit.experimental.categories.Category; @Category({ RPCTests.class, SmallTests.class }) public class TestBlockingIPC extends AbstractTestIPC { + @Override protected RpcServer createRpcServer(Server server, String name, + List services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); + } + @Override protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) { return new BlockingRpcClient(conf) { @@ -55,4 +67,39 @@ public class TestBlockingIPC extends AbstractTestIPC { } }; } + + private static class TestFailingRpcServer extends SimpleRpcServer { + + TestFailingRpcServer(Server server, String name, + List services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + super(server, name, services, bindAddress, conf, scheduler); + } + + final class FailingConnection extends SimpleServerRpcConnection { + private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel, + long lastContact) { + super(rpcServer, channel, lastContact); + } + + @Override + public void processRequest(ByteBuff buf) throws IOException, InterruptedException { + // this will throw exception after the connection header is read, and an RPC is sent + // from client + throw new DoNotRetryIOException("Failing for test"); + } + } + + @Override + protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { + return new FailingConnection(this, channel, time); + } + } + + @Override + protected RpcServer createTestFailingRpcServer(Server server, String name, + List services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java index 6a39e120524..aa534d46cc5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java @@ -17,17 +17,17 @@ */ package org.apache.hadoop.hbase.ipc; -import org.apache.hadoop.hbase.shaded.io.netty.channel.epoll.EpollEventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.epoll.EpollSocketChannel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel; - +import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.JVM; @@ -39,6 +39,13 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.epoll.EpollEventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.epoll.EpollSocketChannel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel; + + @RunWith(Parameterized.class) @Category({ RPCTests.class, SmallTests.class }) public class TestNettyIPC extends AbstractTestIPC { @@ -95,6 +102,13 @@ public class TestNettyIPC extends AbstractTestIPC { } } + @Override + protected RpcServer createRpcServer(Server server, String name, + List services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler); + } + @Override protected NettyRpcClient createRpcClientNoCodec(Configuration conf) { setConf(conf); @@ -125,4 +139,43 @@ public class TestNettyIPC extends AbstractTestIPC { } }; } + + private static class TestFailingRpcServer extends NettyRpcServer { + + TestFailingRpcServer(Server server, String name, + List services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + super(server, name, services, bindAddress, conf, scheduler); + } + + final class FailingConnection extends NettyServerRpcConnection { + private FailingConnection(TestFailingRpcServer rpcServer, Channel channel) { + super(rpcServer, channel); + } + + @Override + public void processRequest(ByteBuff buf) throws IOException, InterruptedException { + // this will throw exception after the connection header is read, and an RPC is sent + // from client + throw new DoNotRetryIOException("Failing for test"); + } + } + + @Override + protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { + return new NettyRpcServerPreambleHandler(TestFailingRpcServer.this) { + @Override + protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { + return new FailingConnection(TestFailingRpcServer.this, channel); + } + }; + } + } + + @Override + protected RpcServer createTestFailingRpcServer(Server server, String name, + List services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler); + } }