HBASE-25336 Use Address instead of InetSocketAddress in RpcClient implementation (#2716)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
7d0a687e57
commit
f813479510
|
@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
|
|||
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -320,7 +318,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
* @return A pair with the Message response and the Cell data (if any).
|
||||
*/
|
||||
private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
|
||||
Message param, Message returnType, final User ticket, final InetSocketAddress isa)
|
||||
Message param, Message returnType, final User ticket, final Address isa)
|
||||
throws ServiceException {
|
||||
BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
|
||||
callMethod(md, hrc, param, returnType, ticket, isa, done);
|
||||
|
@ -392,7 +390,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
|
||||
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
|
||||
final Message param, Message returnType, final User ticket,
|
||||
final InetSocketAddress inetAddr, final RpcCallback<Message> callback) {
|
||||
final Address addr, final RpcCallback<Message> callback) {
|
||||
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
|
||||
cs.setStartTime(EnvironmentEdgeManager.currentTime());
|
||||
|
||||
|
@ -406,7 +404,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
cs.setNumActionsPerServer(numActions);
|
||||
}
|
||||
|
||||
final Address addr = Address.fromSocketAddress(inetAddr);
|
||||
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
|
||||
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
|
||||
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
|
||||
|
@ -520,13 +517,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
|
||||
protected final Address addr;
|
||||
|
||||
// We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup
|
||||
// per method call on the channel. If the remote target is removed or reprovisioned and
|
||||
// its identity changes a new channel with a newly resolved InetSocketAddress will be
|
||||
// created as part of retry, so caching here is fine.
|
||||
// Normally, caching an InetSocketAddress is an anti-pattern.
|
||||
protected InetSocketAddress isa;
|
||||
|
||||
protected final AbstractRpcClient<?> rpcClient;
|
||||
|
||||
protected final User ticket;
|
||||
|
@ -577,22 +567,8 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
@Override
|
||||
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
|
||||
Message param, Message returnType) throws ServiceException {
|
||||
// Look up remote address upon first call
|
||||
if (isa == null) {
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookups();
|
||||
}
|
||||
isa = Address.toSocketAddress(addr);
|
||||
if (isa.isUnresolved()) {
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookupsFailed();
|
||||
}
|
||||
isa = null;
|
||||
throw new ServiceException(new UnknownHostException(addr + " could not be resolved"));
|
||||
}
|
||||
}
|
||||
return rpcClient.callBlockingMethod(md, configureRpcController(controller),
|
||||
param, returnType, ticket, isa);
|
||||
return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType,
|
||||
ticket, addr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -608,29 +584,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
}
|
||||
|
||||
@Override
|
||||
public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
|
||||
Message param, Message returnType, RpcCallback<Message> done) {
|
||||
HBaseRpcController configuredController =
|
||||
configureRpcController(Preconditions.checkNotNull(controller,
|
||||
"RpcController can not be null for async rpc call"));
|
||||
// Look up remote address upon first call
|
||||
if (isa == null || isa.isUnresolved()) {
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookups();
|
||||
}
|
||||
isa = Address.toSocketAddress(addr);
|
||||
if (isa.isUnresolved()) {
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookupsFailed();
|
||||
}
|
||||
isa = null;
|
||||
controller.setFailed(addr + " could not be resolved");
|
||||
return;
|
||||
}
|
||||
}
|
||||
public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param,
|
||||
Message returnType, RpcCallback<Message> done) {
|
||||
HBaseRpcController configuredController = configureRpcController(
|
||||
Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call"));
|
||||
// This method does not throw any exceptions, so the caller must provide a
|
||||
// HBaseRpcController which is used to pass the exceptions.
|
||||
this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done);
|
||||
this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import java.io.OutputStream;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Locale;
|
||||
|
@ -44,7 +43,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import javax.security.sasl.SaslException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
|
@ -52,7 +50,6 @@ import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
|||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
|
||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
|
||||
import org.apache.hadoop.hbase.security.SaslUtil;
|
||||
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
|
||||
|
@ -69,11 +66,13 @@ import org.apache.htrace.core.TraceScope;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
|
||||
|
@ -256,16 +255,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
|||
if (this.rpcClient.localAddr != null) {
|
||||
this.socket.bind(this.rpcClient.localAddr);
|
||||
}
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookups();
|
||||
}
|
||||
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
|
||||
if (remoteAddr.isUnresolved()) {
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookupsFailed();
|
||||
}
|
||||
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
|
||||
}
|
||||
InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
|
||||
NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO);
|
||||
this.socket.setSoTimeout(this.rpcClient.readTO);
|
||||
return;
|
||||
|
@ -374,15 +364,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
|||
if (this.metrics != null) {
|
||||
this.metrics.incrNsLookups();
|
||||
}
|
||||
InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
|
||||
if (serverAddr.isUnresolved()) {
|
||||
if (this.metrics != null) {
|
||||
this.metrics.incrNsLookupsFailed();
|
||||
}
|
||||
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
|
||||
}
|
||||
saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
|
||||
serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed,
|
||||
socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed,
|
||||
this.rpcClient.conf.get("hbase.rpc.protection",
|
||||
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
|
||||
this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
|
||||
|
|
|
@ -32,17 +32,16 @@ import java.util.concurrent.ThreadLocalRandom;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
|
||||
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
|
||||
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||
|
@ -210,18 +209,9 @@ class NettyRpcConnection extends RpcConnection {
|
|||
Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
|
||||
final NettyHBaseSaslRpcClientHandler saslHandler;
|
||||
try {
|
||||
if (this.metrics != null) {
|
||||
this.metrics.incrNsLookups();
|
||||
}
|
||||
InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
|
||||
if (serverAddr.isUnresolved()) {
|
||||
if (this.metrics != null) {
|
||||
this.metrics.incrNsLookupsFailed();
|
||||
}
|
||||
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
|
||||
}
|
||||
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
|
||||
serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
|
||||
((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo,
|
||||
rpcClient.fallbackAllowed, this.rpcClient.conf);
|
||||
} catch (IOException e) {
|
||||
failInit(ch, e);
|
||||
return;
|
||||
|
@ -282,16 +272,7 @@ class NettyRpcConnection extends RpcConnection {
|
|||
private void connect() throws UnknownHostException {
|
||||
assert eventLoop.inEventLoop();
|
||||
LOG.trace("Connecting to {}", remoteId.getAddress());
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookups();
|
||||
}
|
||||
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
|
||||
if (remoteAddr.isUnresolved()) {
|
||||
if (this.rpcClient.metrics != null) {
|
||||
this.rpcClient.metrics.incrNsLookupsFailed();
|
||||
}
|
||||
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
|
||||
}
|
||||
InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
|
||||
this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
|
||||
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
|
||||
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
|
||||
|
|
|
@ -18,12 +18,15 @@
|
|||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.security.SecurityInfo;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
|
||||
|
@ -122,7 +125,7 @@ abstract class RpcConnection {
|
|||
this.remoteId = remoteId;
|
||||
}
|
||||
|
||||
protected void scheduleTimeoutTask(final Call call) {
|
||||
protected final void scheduleTimeoutTask(final Call call) {
|
||||
if (call.timeout > 0) {
|
||||
call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() {
|
||||
|
||||
|
@ -137,7 +140,7 @@ abstract class RpcConnection {
|
|||
}
|
||||
}
|
||||
|
||||
protected byte[] getConnectionHeaderPreamble() {
|
||||
protected final byte[] getConnectionHeaderPreamble() {
|
||||
// Assemble the preamble up in a buffer first and then send it. Writing individual elements,
|
||||
// they are getting sent across piecemeal according to wireshark and then server is messing
|
||||
// up the reading on occasion (the passed in stream is not buffered yet).
|
||||
|
@ -153,7 +156,7 @@ abstract class RpcConnection {
|
|||
return preamble;
|
||||
}
|
||||
|
||||
protected ConnectionHeader getConnectionHeader() {
|
||||
protected final ConnectionHeader getConnectionHeader() {
|
||||
final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
|
||||
builder.setServiceName(remoteId.getServiceName());
|
||||
final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket);
|
||||
|
@ -176,6 +179,21 @@ abstract class RpcConnection {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics)
|
||||
throws UnknownHostException {
|
||||
if (metrics != null) {
|
||||
metrics.incrNsLookups();
|
||||
}
|
||||
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
|
||||
if (remoteAddr.isUnresolved()) {
|
||||
if (metrics != null) {
|
||||
metrics.incrNsLookupsFailed();
|
||||
}
|
||||
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
|
||||
}
|
||||
return remoteAddr;
|
||||
}
|
||||
|
||||
protected abstract void callTimeout(Call call);
|
||||
|
||||
public ConnectionId remoteId() {
|
||||
|
|
Loading…
Reference in New Issue