From f8134795109bc380b53ec814561e1abdb56b2b58 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 7 Dec 2020 21:49:04 +0800 Subject: [PATCH] HBASE-25336 Use Address instead of InetSocketAddress in RpcClient implementation (#2716) Signed-off-by: Guanghao Zhang --- .../hadoop/hbase/ipc/AbstractRpcClient.java | 60 ++++--------------- .../hbase/ipc/BlockingRpcConnection.java | 25 ++------ .../hadoop/hbase/ipc/NettyRpcConnection.java | 27 ++------- .../hadoop/hbase/ipc/RpcConnection.java | 24 +++++++- 4 files changed, 39 insertions(+), 97 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 7a7b848304f..e9ec6a92ee9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.UnknownHostException; import java.util.Collection; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -320,7 +318,7 @@ public abstract class AbstractRpcClient implements RpcC * @return A pair with the Message response and the Cell data (if any). */ private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc, - Message param, Message returnType, final User ticket, final InetSocketAddress isa) + Message param, Message returnType, final User ticket, final Address isa) throws ServiceException { BlockingRpcCallback done = new BlockingRpcCallback<>(); callMethod(md, hrc, param, returnType, ticket, isa, done); @@ -392,7 +390,7 @@ public abstract class AbstractRpcClient implements RpcC Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, final Message param, Message returnType, final User ticket, - final InetSocketAddress inetAddr, final RpcCallback callback) { + final Address addr, final RpcCallback callback) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); cs.setStartTime(EnvironmentEdgeManager.currentTime()); @@ -406,7 +404,6 @@ public abstract class AbstractRpcClient implements RpcC cs.setNumActionsPerServer(numActions); } - final Address addr = Address.fromSocketAddress(inetAddr); final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { @@ -520,13 +517,6 @@ public abstract class AbstractRpcClient implements RpcC protected final Address addr; - // We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup - // per method call on the channel. If the remote target is removed or reprovisioned and - // its identity changes a new channel with a newly resolved InetSocketAddress will be - // created as part of retry, so caching here is fine. - // Normally, caching an InetSocketAddress is an anti-pattern. - protected InetSocketAddress isa; - protected final AbstractRpcClient rpcClient; protected final User ticket; @@ -576,23 +566,9 @@ public abstract class AbstractRpcClient implements RpcC @Override public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, - Message param, Message returnType) throws ServiceException { - // Look up remote address upon first call - if (isa == null) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookups(); - } - isa = Address.toSocketAddress(addr); - if (isa.isUnresolved()) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookupsFailed(); - } - isa = null; - throw new ServiceException(new UnknownHostException(addr + " could not be resolved")); - } - } - return rpcClient.callBlockingMethod(md, configureRpcController(controller), - param, returnType, ticket, isa); + Message param, Message returnType) throws ServiceException { + return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType, + ticket, addr); } } @@ -608,29 +584,13 @@ public abstract class AbstractRpcClient implements RpcC } @Override - public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, - Message param, Message returnType, RpcCallback done) { - HBaseRpcController configuredController = - configureRpcController(Preconditions.checkNotNull(controller, - "RpcController can not be null for async rpc call")); - // Look up remote address upon first call - if (isa == null || isa.isUnresolved()) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookups(); - } - isa = Address.toSocketAddress(addr); - if (isa.isUnresolved()) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookupsFailed(); - } - isa = null; - controller.setFailed(addr + " could not be resolved"); - return; - } - } + public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, + Message returnType, RpcCallback done) { + HBaseRpcController configuredController = configureRpcController( + Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call")); // This method does not throw any exceptions, so the caller must provide a // HBaseRpcController which is used to pass the exceptions. - this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done); + this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index ce2bd11f960..cd8035fd58e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -35,7 +35,6 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; -import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.util.ArrayDeque; import java.util.Locale; @@ -44,7 +43,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import javax.security.sasl.SaslException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -52,7 +50,6 @@ import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; import org.apache.hadoop.hbase.log.HBaseMarkers; -import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; @@ -69,11 +66,13 @@ import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; @@ -256,16 +255,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (this.rpcClient.localAddr != null) { this.socket.bind(this.rpcClient.localAddr); } - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookups(); - } - InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); - if (remoteAddr.isUnresolved()) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookupsFailed(); - } - throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); - } + InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO); this.socket.setSoTimeout(this.rpcClient.readTO); return; @@ -374,15 +364,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (this.metrics != null) { this.metrics.incrNsLookups(); } - InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress()); - if (serverAddr.isUnresolved()) { - if (this.metrics != null) { - this.metrics.incrNsLookupsFailed(); - } - throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); - } saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, - serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed, + socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 609d2c12cea..d0a13ca33d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -32,17 +32,16 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; -import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; import org.apache.hadoop.hbase.security.SaslChallengeDecoder; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; @@ -210,18 +209,9 @@ class NettyRpcConnection extends RpcConnection { Promise saslPromise = ch.eventLoop().newPromise(); final NettyHBaseSaslRpcClientHandler saslHandler; try { - if (this.metrics != null) { - this.metrics.incrNsLookups(); - } - InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress()); - if (serverAddr.isUnresolved()) { - if (this.metrics != null) { - this.metrics.incrNsLookupsFailed(); - } - throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); - } saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, - serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf); + ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo, + rpcClient.fallbackAllowed, this.rpcClient.conf); } catch (IOException e) { failInit(ch, e); return; @@ -282,16 +272,7 @@ class NettyRpcConnection extends RpcConnection { private void connect() throws UnknownHostException { assert eventLoop.inEventLoop(); LOG.trace("Connecting to {}", remoteId.getAddress()); - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookups(); - } - InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); - if (remoteAddr.isUnresolved()) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookupsFailed(); - } - throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); - } + InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 6749efe66f3..b2c7eeae4a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; @@ -122,7 +125,7 @@ abstract class RpcConnection { this.remoteId = remoteId; } - protected void scheduleTimeoutTask(final Call call) { + protected final void scheduleTimeoutTask(final Call call) { if (call.timeout > 0) { call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() { @@ -137,7 +140,7 @@ abstract class RpcConnection { } } - protected byte[] getConnectionHeaderPreamble() { + protected final byte[] getConnectionHeaderPreamble() { // Assemble the preamble up in a buffer first and then send it. Writing individual elements, // they are getting sent across piecemeal according to wireshark and then server is messing // up the reading on occasion (the passed in stream is not buffered yet). @@ -153,7 +156,7 @@ abstract class RpcConnection { return preamble; } - protected ConnectionHeader getConnectionHeader() { + protected final ConnectionHeader getConnectionHeader() { final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); builder.setServiceName(remoteId.getServiceName()); final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket); @@ -176,6 +179,21 @@ abstract class RpcConnection { return builder.build(); } + protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics) + throws UnknownHostException { + if (metrics != null) { + metrics.incrNsLookups(); + } + InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); + if (remoteAddr.isUnresolved()) { + if (metrics != null) { + metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } + return remoteAddr; + } + protected abstract void callTimeout(Call call); public ConnectionId remoteId() {