HBASE-25336 Use Address instead of InetSocketAddress in RpcClient implementation (#2716)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Duo Zhang 2020-12-07 21:49:04 +08:00
parent 7bb3ab7687
commit ec47e46c7e
4 changed files with 39 additions and 97 deletions

View File

@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -320,7 +318,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
* @return A pair with the Message response and the Cell data (if any). * @return A pair with the Message response and the Cell data (if any).
*/ */
private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc, private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
Message param, Message returnType, final User ticket, final InetSocketAddress isa) Message param, Message returnType, final User ticket, final Address isa)
throws ServiceException { throws ServiceException {
BlockingRpcCallback<Message> done = new BlockingRpcCallback<>(); BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
callMethod(md, hrc, param, returnType, ticket, isa, done); callMethod(md, hrc, param, returnType, ticket, isa, done);
@ -392,7 +390,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final Message param, Message returnType, final User ticket,
final InetSocketAddress inetAddr, final RpcCallback<Message> callback) { final Address addr, final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime()); cs.setStartTime(EnvironmentEdgeManager.currentTime());
@ -406,7 +404,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
cs.setNumActionsPerServer(numActions); cs.setNumActionsPerServer(numActions);
} }
final Address addr = Address.fromSocketAddress(inetAddr);
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() { hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@ -520,13 +517,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
protected final Address addr; protected final Address addr;
// We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup
// per method call on the channel. If the remote target is removed or reprovisioned and
// its identity changes a new channel with a newly resolved InetSocketAddress will be
// created as part of retry, so caching here is fine.
// Normally, caching an InetSocketAddress is an anti-pattern.
protected InetSocketAddress isa;
protected final AbstractRpcClient<?> rpcClient; protected final AbstractRpcClient<?> rpcClient;
protected final User ticket; protected final User ticket;
@ -576,23 +566,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
@Override @Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException { Message param, Message returnType) throws ServiceException {
// Look up remote address upon first call return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType,
if (isa == null) { ticket, addr);
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookups();
}
isa = Address.toSocketAddress(addr);
if (isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
isa = null;
throw new ServiceException(new UnknownHostException(addr + " could not be resolved"));
}
}
return rpcClient.callBlockingMethod(md, configureRpcController(controller),
param, returnType, ticket, isa);
} }
} }
@ -608,29 +584,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
} }
@Override @Override
public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param,
Message param, Message returnType, RpcCallback<Message> done) { Message returnType, RpcCallback<Message> done) {
HBaseRpcController configuredController = HBaseRpcController configuredController = configureRpcController(
configureRpcController(Preconditions.checkNotNull(controller, Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call"));
"RpcController can not be null for async rpc call"));
// Look up remote address upon first call
if (isa == null || isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookups();
}
isa = Address.toSocketAddress(addr);
if (isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
isa = null;
controller.setFailed(addr + " could not be resolved");
return;
}
}
// This method does not throw any exceptions, so the caller must provide a // This method does not throw any exceptions, so the caller must provide a
// HBaseRpcController which is used to pass the exceptions. // HBaseRpcController which is used to pass the exceptions.
this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done); this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done);
} }
} }
} }

View File

@ -35,7 +35,6 @@ import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Locale; import java.util.Locale;
@ -44,7 +43,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import javax.security.sasl.SaslException; import javax.security.sasl.SaslException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -52,7 +50,6 @@ import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@ -69,11 +66,13 @@ import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@ -256,16 +255,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
if (this.rpcClient.localAddr != null) { if (this.rpcClient.localAddr != null) {
this.socket.bind(this.rpcClient.localAddr); this.socket.bind(this.rpcClient.localAddr);
} }
if (this.rpcClient.metrics != null) { InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
this.rpcClient.metrics.incrNsLookups();
}
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
if (remoteAddr.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
}
NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO); NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO);
this.socket.setSoTimeout(this.rpcClient.readTO); this.socket.setSoTimeout(this.rpcClient.readTO);
return; return;
@ -374,15 +364,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
if (this.metrics != null) { if (this.metrics != null) {
this.metrics.incrNsLookups(); this.metrics.incrNsLookups();
} }
InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
if (serverAddr.isUnresolved()) {
if (this.metrics != null) {
this.metrics.incrNsLookupsFailed();
}
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
}
saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed, socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed,
this.rpcClient.conf.get("hbase.rpc.protection", this.rpcClient.conf.get("hbase.rpc.protection",
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));

View File

@ -32,17 +32,16 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder; import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@ -210,18 +209,9 @@ class NettyRpcConnection extends RpcConnection {
Promise<Boolean> saslPromise = ch.eventLoop().newPromise(); Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
final NettyHBaseSaslRpcClientHandler saslHandler; final NettyHBaseSaslRpcClientHandler saslHandler;
try { try {
if (this.metrics != null) {
this.metrics.incrNsLookups();
}
InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
if (serverAddr.isUnresolved()) {
if (this.metrics != null) {
this.metrics.incrNsLookupsFailed();
}
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
}
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf); ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo,
rpcClient.fallbackAllowed, this.rpcClient.conf);
} catch (IOException e) { } catch (IOException e) {
failInit(ch, e); failInit(ch, e);
return; return;
@ -282,16 +272,7 @@ class NettyRpcConnection extends RpcConnection {
private void connect() throws UnknownHostException { private void connect() throws UnknownHostException {
assert eventLoop.inEventLoop(); assert eventLoop.inEventLoop();
LOG.trace("Connecting to {}", remoteId.getAddress()); LOG.trace("Connecting to {}", remoteId.getAddress());
if (this.rpcClient.metrics != null) { InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
this.rpcClient.metrics.incrNsLookups();
}
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
if (remoteAddr.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
}
this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.SecurityInfo;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
@ -122,7 +125,7 @@ abstract class RpcConnection {
this.remoteId = remoteId; this.remoteId = remoteId;
} }
protected void scheduleTimeoutTask(final Call call) { protected final void scheduleTimeoutTask(final Call call) {
if (call.timeout > 0) { if (call.timeout > 0) {
call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() { call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() {
@ -137,7 +140,7 @@ abstract class RpcConnection {
} }
} }
protected byte[] getConnectionHeaderPreamble() { protected final byte[] getConnectionHeaderPreamble() {
// Assemble the preamble up in a buffer first and then send it. Writing individual elements, // Assemble the preamble up in a buffer first and then send it. Writing individual elements,
// they are getting sent across piecemeal according to wireshark and then server is messing // they are getting sent across piecemeal according to wireshark and then server is messing
// up the reading on occasion (the passed in stream is not buffered yet). // up the reading on occasion (the passed in stream is not buffered yet).
@ -153,7 +156,7 @@ abstract class RpcConnection {
return preamble; return preamble;
} }
protected ConnectionHeader getConnectionHeader() { protected final ConnectionHeader getConnectionHeader() {
final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
builder.setServiceName(remoteId.getServiceName()); builder.setServiceName(remoteId.getServiceName());
final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket); final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket);
@ -176,6 +179,21 @@ abstract class RpcConnection {
return builder.build(); return builder.build();
} }
protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics)
throws UnknownHostException {
if (metrics != null) {
metrics.incrNsLookups();
}
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
if (remoteAddr.isUnresolved()) {
if (metrics != null) {
metrics.incrNsLookupsFailed();
}
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
}
return remoteAddr;
}
protected abstract void callTimeout(Call call); protected abstract void callTimeout(Call call);
public ConnectionId remoteId() { public ConnectionId remoteId() {