HBASE-14771 RpcServer#getRemoteAddress always returns null (Abhishek Kumar)
This commit is contained in:
parent
e40ed0e836
commit
5362ac0311
|
@ -1882,7 +1882,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
|
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
|
||||||
: null;
|
: null;
|
||||||
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
|
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
|
||||||
totalRequestSize, traceInfo, RpcServer.getRemoteIp());
|
totalRequestSize, traceInfo, this.addr);
|
||||||
scheduler.dispatch(new CallRunner(RpcServer.this, call));
|
scheduler.dispatch(new CallRunner(RpcServer.this, call));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.internal.verification.VerificationModeFactory.times;
|
import static org.mockito.internal.verification.VerificationModeFactory.times;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -37,23 +38,28 @@ import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
|
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
|
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
|
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
|
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
|
||||||
|
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.protobuf.BlockingRpcChannel;
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
|
@ -282,4 +288,79 @@ public abstract class AbstractTestIPC {
|
||||||
verify(scheduler).stop();
|
verify(scheduler).stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Instance of RpcServer that echoes client hostAddress back to client
|
||||||
|
*/
|
||||||
|
static class TestRpcServer1 extends RpcServer {
|
||||||
|
|
||||||
|
private static BlockingInterface SERVICE1 =
|
||||||
|
new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
|
||||||
|
@Override
|
||||||
|
public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
return EmptyResponseProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress();
|
||||||
|
final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress();
|
||||||
|
return EchoResponseProto.newBuilder().setMessage(message).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EmptyResponseProto error(RpcController unused, EmptyRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
throw new ServiceException("error", new IOException("error"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
TestRpcServer1() throws IOException {
|
||||||
|
this(new FifoRpcScheduler(CONF, 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
TestRpcServer1(RpcScheduler scheduler) throws IOException {
|
||||||
|
super(null, "testRemoteAddressInCallObject", Lists
|
||||||
|
.newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto
|
||||||
|
.newReflectiveBlockingService(SERVICE1), null)),
|
||||||
|
new InetSocketAddress("localhost", 0), CONF, scheduler);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
|
||||||
|
* remoteAddress set to its Call Object
|
||||||
|
* @throws ServiceException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException,
|
||||||
|
ServiceException {
|
||||||
|
final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
|
||||||
|
final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler);
|
||||||
|
final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
|
||||||
|
final AbstractRpcClient client =
|
||||||
|
new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null);
|
||||||
|
try {
|
||||||
|
rpcServer.start();
|
||||||
|
final InetSocketAddress isa = rpcServer.getListenerAddress();
|
||||||
|
if (isa == null) {
|
||||||
|
throw new IOException("Listener channel is closed");
|
||||||
|
}
|
||||||
|
final BlockingRpcChannel channel =
|
||||||
|
client.createBlockingRpcChannel(
|
||||||
|
ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()),
|
||||||
|
User.getCurrent(), 0);
|
||||||
|
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
|
||||||
|
TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
|
||||||
|
final EchoRequestProto echoRequest =
|
||||||
|
EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
|
||||||
|
final EchoResponseProto echoResponse = stub.echo(null, echoRequest);
|
||||||
|
Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage());
|
||||||
|
} finally {
|
||||||
|
client.close();
|
||||||
|
rpcServer.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue