From 5362ac03116c9b44f0e82a3d69f194f2fc467e51 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Nov 2015 18:49:05 -0800 Subject: [PATCH] HBASE-14771 RpcServer#getRemoteAddress always returns null (Abhishek Kumar) --- .../apache/hadoop/hbase/ipc/RpcServer.java | 2 +- .../hadoop/hbase/ipc/AbstractTestIPC.java | 81 +++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 30e5e867d96..13846532c76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1882,7 +1882,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) : null; Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, - totalRequestSize, traceInfo, RpcServer.getRemoteIp()); + totalRequestSize, traceInfo, this.addr); scheduler.dispatch(new CallRunner(RpcServer.this, call)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index dffd8e9b3cf..5df1edc29c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -37,23 +38,28 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; +import org.junit.Assert; import org.junit.Test; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.BlockingService; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; @@ -282,4 +288,79 @@ public abstract class AbstractTestIPC { verify(scheduler).stop(); } } + + /** + * Instance of RpcServer that echoes client hostAddress back to client + */ + static class TestRpcServer1 extends RpcServer { + + private static BlockingInterface SERVICE1 = + new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { + @Override + public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request) + throws ServiceException { + return EmptyResponseProto.newBuilder().build(); + } + + @Override + public EchoResponseProto echo(RpcController unused, EchoRequestProto request) + throws ServiceException { + final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress(); + final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress(); + return EchoResponseProto.newBuilder().setMessage(message).build(); + } + + @Override + public EmptyResponseProto error(RpcController unused, EmptyRequestProto request) + throws ServiceException { + throw new ServiceException("error", new IOException("error")); + } + }; + + TestRpcServer1() throws IOException { + this(new FifoRpcScheduler(CONF, 1)); + } + + TestRpcServer1(RpcScheduler scheduler) throws IOException { + super(null, "testRemoteAddressInCallObject", Lists + .newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto + .newReflectiveBlockingService(SERVICE1), null)), + new InetSocketAddress("localhost", 0), CONF, scheduler); + } + } + + /** + * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null + * remoteAddress set to its Call Object + * @throws ServiceException + */ + @Test + public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException, + ServiceException { + final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1); + final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler); + final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); + final AbstractRpcClient client = + new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null); + try { + rpcServer.start(); + final InetSocketAddress isa = rpcServer.getListenerAddress(); + if (isa == null) { + throw new IOException("Listener channel is closed"); + } + final BlockingRpcChannel channel = + client.createBlockingRpcChannel( + ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0); + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); + final EchoRequestProto echoRequest = + EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build(); + final EchoResponseProto echoResponse = stub.echo(null, echoRequest); + Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage()); + } finally { + client.close(); + rpcServer.stop(); + } + } }