HBASE-14771 RpcServer#getRemoteAddress always returns null (Abhishek Kumar)

This commit is contained in:
tedyu 2015-11-17 18:46:08 -08:00
parent d6fdf92f9e
commit 1b13bfcd43
2 changed files with 82 additions and 1 deletions

View File

@ -1898,7 +1898,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
: null; : null;
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize, traceInfo, RpcServer.getRemoteIp()); totalRequestSize, traceInfo, this.addr);
scheduler.dispatch(new CallRunner(RpcServer.this, call)); scheduler.dispatch(new CallRunner(RpcServer.this, call));
} }

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times; import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -37,23 +38,28 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message; import com.google.protobuf.Message;
@ -282,4 +288,79 @@ public abstract class AbstractTestIPC {
verify(scheduler).stop(); verify(scheduler).stop();
} }
} }
/**
* Instance of RpcServer that echoes client hostAddress back to client
*/
static class TestRpcServer1 extends RpcServer {
private static BlockingInterface SERVICE1 =
new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
@Override
public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request)
throws ServiceException {
return EmptyResponseProto.newBuilder().build();
}
@Override
public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
throws ServiceException {
final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress();
final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress();
return EchoResponseProto.newBuilder().setMessage(message).build();
}
@Override
public EmptyResponseProto error(RpcController unused, EmptyRequestProto request)
throws ServiceException {
throw new ServiceException("error", new IOException("error"));
}
};
TestRpcServer1() throws IOException {
this(new FifoRpcScheduler(CONF, 1));
}
TestRpcServer1(RpcScheduler scheduler) throws IOException {
super(null, "testRemoteAddressInCallObject", Lists
.newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto
.newReflectiveBlockingService(SERVICE1), null)),
new InetSocketAddress("localhost", 0), CONF, scheduler);
}
}
/**
* Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
* remoteAddress set to its Call Object
* @throws ServiceException
*/
@Test
public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException,
ServiceException {
final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler);
final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
final AbstractRpcClient client =
new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null);
try {
rpcServer.start();
final InetSocketAddress isa = rpcServer.getListenerAddress();
if (isa == null) {
throw new IOException("Listener channel is closed");
}
final BlockingRpcChannel channel =
client.createBlockingRpcChannel(
ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0);
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
final EchoRequestProto echoRequest =
EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
final EchoResponseProto echoResponse = stub.echo(null, echoRequest);
Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage());
} finally {
client.close();
rpcServer.stop();
}
}
} }