diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index b48729f7461..00ce2754c80 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -28,6 +28,8 @@ import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLE import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -74,8 +76,6 @@ class AsyncConnectionImpl implements AsyncConnection { .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10, TimeUnit.MILLISECONDS); - private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; - private final Configuration conf; final AsyncConnectionConfiguration connConf; @@ -90,8 +90,6 @@ class AsyncConnectionImpl implements AsyncConnection { final RpcControllerFactory rpcControllerFactory; - private final boolean hostnameCanChange; - private final AsyncRegionLocator locator; final AsyncRpcRetryingCallerFactory callerFactory; @@ -134,7 +132,6 @@ class AsyncConnectionImpl implements AsyncConnection { } this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null)); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); this.locator = new AsyncRegionLocator(this, RETRY_TIMER); @@ -250,7 +247,7 @@ class AsyncConnectionImpl implements AsyncConnection { ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { return ConcurrentMapUtils.computeIfAbsentEx(rsStubs, - getStubKey(ClientService.getDescriptor().getName(), serverName, hostnameCanChange), + getStubKey(ClientService.getDescriptor().getName(), serverName), () -> createRegionServerStub(serverName)); } @@ -264,7 +261,7 @@ class AsyncConnectionImpl implements AsyncConnection { AdminService.Interface getAdminStub(ServerName serverName) throws IOException { return ConcurrentMapUtils.computeIfAbsentEx(adminSubs, - getStubKey(AdminService.getDescriptor().getName(), serverName, hostnameCanChange), + getStubKey(AdminService.getDescriptor().getName(), serverName), () -> createAdminServerStub(serverName)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index e9039c248fa..4daf2b90726 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -34,6 +34,8 @@ import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -157,9 +159,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); - private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; - - private final boolean hostnamesCanChange; private final long pause; private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified // The mode tells if HedgedRead, LoadBalance mode is supported. @@ -297,7 +296,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); - this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); Class listenerClass = conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, @@ -476,7 +474,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { throw new RegionServerStoppedException(masterServer + " is dead."); } String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), - masterServer, this.hostnamesCanChange); + masterServer); return new HBaseHbck( (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { @@ -1242,7 +1240,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } // Use the security info interface name as our stub key String key = - getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange); + getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn); MasterProtos.MasterService.BlockingInterface stub = (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); @@ -1290,8 +1288,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (isDeadServer(serverName)) { throw new RegionServerStoppedException(serverName + " is dead."); } - String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName, - this.hostnamesCanChange); + String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName); return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); @@ -1306,7 +1303,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { throw new RegionServerStoppedException(serverName + " is dead."); } String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), - serverName, this.hostnamesCanChange); + serverName); return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index d6f21944a1a..df376bbe5d0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; @@ -215,6 +216,13 @@ public final class ConnectionUtils { } } + /** + * Get a unique key for the rpc stub to the given server. + */ + static String getStubKey(String serviceName, ServerName serverName) { + return String.format("%s@%s", serviceName, serverName); + } + /** * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE]. */ @@ -222,28 +230,6 @@ public final class ConnectionUtils { return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1); } - /** - * Get a unique key for the rpc stub to the given server. - */ - static String getStubKey(String serviceName, ServerName serverName, boolean hostnameCanChange) { - // Sometimes, servers go down and they come back up with the same hostname but a different - // IP address. Force a resolution of the rsHostname by trying to instantiate an - // InetSocketAddress, and this way we will rightfully get a new stubKey. - // Also, include the hostname in the key so as to take care of those cases where the - // DNS name is different but IP address remains the same. - String hostname = serverName.getHostname(); - int port = serverName.getPort(); - if (hostnameCanChange) { - try { - InetAddress ip = InetAddress.getByName(hostname); - return serviceName + "@" + hostname + "-" + ip.getHostAddress() + ":" + port; - } catch (UnknownHostException e) { - LOG.warn("Can not resolve " + hostname + ", please check your network", e); - } - } - return serviceName + "@" + hostname + ":" + port; - } - static void checkHasFamilies(Mutation mutation) { Preconditions.checkArgument(mutation.numFamilies() > 0, "Invalid arguments to %s, zero columns specified", mutation.toString()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 993c6caae1a..8566ec551e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -66,6 +66,8 @@ public class MetricsConnection implements StatisticTrackable { private static final String HEAP_BASE = "heapOccupancy_"; private static final String CACHE_BASE = "cacheDroppingExceptions_"; private static final String UNKNOWN_EXCEPTION = "UnknownException"; + private static final String NS_LOOKUPS = "nsLookups"; + private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed"; private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); /** A container class for collecting details about the RPC call as it percolates. */ @@ -288,6 +290,8 @@ public class MetricsConnection implements StatisticTrackable { protected final Counter hedgedReadWin; protected final Histogram concurrentCallsPerServerHist; protected final Histogram numActionsPerServerHist; + protected final Counter nsLookups; + protected final Counter nsLookupsFailed; // dynamic metrics @@ -350,6 +354,8 @@ public class MetricsConnection implements StatisticTrackable { "concurrentCallsPerServer", scope)); this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope)); + this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope)); + this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope)); this.reporter = JmxReporter.forRegistry(this.registry).build(); this.reporter.start(); @@ -518,4 +524,12 @@ public class MetricsConnection implements StatisticTrackable { (exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), cacheDroppingExceptions, counterFactory).inc(); } + + public void incrNsLookups() { + this.nsLookups.inc(); + } + + public void incrNsLookupsFailed() { + this.nsLookupsFailed.inc(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 1879f26eafc..f536792bd26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -134,10 +135,10 @@ public abstract class AbstractRpcClient implements RpcC private int maxConcurrentCallsPerServer; - private static final LoadingCache concurrentCounterCache = + private static final LoadingCache concurrentCounterCache = CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS). - build(new CacheLoader() { - @Override public AtomicInteger load(InetSocketAddress key) throws Exception { + build(new CacheLoader() { + @Override public AtomicInteger load(Address key) throws Exception { return new AtomicInteger(0); } }); @@ -206,7 +207,7 @@ public abstract class AbstractRpcClient implements RpcC // The connection itself will disconnect if there is no pending call for maxIdleTime. if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { if (LOG.isTraceEnabled()) { - LOG.trace("Cleanup idle connection to {}", conn.remoteId().address); + LOG.trace("Cleanup idle connection to {}", conn.remoteId().getAddress()); } connections.remove(conn.remoteId(), conn); conn.cleanupConnection(); @@ -343,11 +344,11 @@ public abstract class AbstractRpcClient implements RpcC private T getConnection(ConnectionId remoteId) throws IOException { if (failedServers.isFailedServer(remoteId.getAddress())) { if (LOG.isDebugEnabled()) { - LOG.debug("Not trying to connect to " + remoteId.address + LOG.debug("Not trying to connect to " + remoteId.getAddress() + " this server is in the failed servers list"); } throw new FailedServerException( - "This server is in the failed servers list: " + remoteId.address); + "This server is in the failed servers list: " + remoteId.getAddress()); } T conn; synchronized (connections) { @@ -365,7 +366,7 @@ public abstract class AbstractRpcClient implements RpcC */ protected abstract T createConnection(ConnectionId remoteId) throws IOException; - private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr, + private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, RpcCallback callback) { call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); if (metrics != null) { @@ -390,8 +391,8 @@ public abstract class AbstractRpcClient implements RpcC } Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, - final Message param, Message returnType, final User ticket, final InetSocketAddress addr, - final RpcCallback callback) { + final Message param, Message returnType, final User ticket, + final InetSocketAddress inetAddr, final RpcCallback callback) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); cs.setStartTime(EnvironmentEdgeManager.currentTime()); @@ -405,6 +406,7 @@ public abstract class AbstractRpcClient implements RpcC cs.setNumActionsPerServer(numActions); } + final Address addr = Address.fromSocketAddress(inetAddr); final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { @@ -429,12 +431,8 @@ public abstract class AbstractRpcClient implements RpcC return call; } - InetSocketAddress createAddr(ServerName sn) throws UnknownHostException { - InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort()); - if (addr.isUnresolved()) { - throw new UnknownHostException("can not resolve " + sn.getServerName()); - } - return addr; + private static Address createAddr(ServerName sn) { + return Address.fromParts(sn.getHostname(), sn.getPort()); } /** @@ -449,8 +447,8 @@ public abstract class AbstractRpcClient implements RpcC synchronized (connections) { for (T connection : connections.values()) { ConnectionId remoteId = connection.remoteId(); - if (remoteId.address.getPort() == sn.getPort() - && remoteId.address.getHostName().equals(sn.getHostname())) { + if (remoteId.getAddress().getPort() == sn.getPort() + && remoteId.getAddress().getHostName().equals(sn.getHostname())) { LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " + connection.remoteId); connections.remove(remoteId, connection); @@ -509,19 +507,25 @@ public abstract class AbstractRpcClient implements RpcC @Override public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, - int rpcTimeout) throws UnknownHostException { + int rpcTimeout) { return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout); } @Override - public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) - throws UnknownHostException { + public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) { return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); } private static class AbstractRpcChannel { - protected final InetSocketAddress addr; + protected final Address addr; + + // We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup + // per method call on the channel. If the remote target is removed or reprovisioned and + // its identity changes a new channel with a newly resolved InetSocketAddress will be + // created as part of retry, so caching here is fine. + // Normally, caching an InetSocketAddress is an anti-pattern. + protected InetSocketAddress isa; protected final AbstractRpcClient rpcClient; @@ -529,7 +533,7 @@ public abstract class AbstractRpcClient implements RpcC protected final int rpcTimeout; - protected AbstractRpcChannel(AbstractRpcClient rpcClient, InetSocketAddress addr, + protected AbstractRpcChannel(AbstractRpcClient rpcClient, Address addr, User ticket, int rpcTimeout) { this.addr = addr; this.rpcClient = rpcClient; @@ -566,15 +570,29 @@ public abstract class AbstractRpcClient implements RpcC implements BlockingRpcChannel { protected BlockingRpcChannelImplementation(AbstractRpcClient rpcClient, - InetSocketAddress addr, User ticket, int rpcTimeout) { + Address addr, User ticket, int rpcTimeout) { super(rpcClient, addr, ticket, rpcTimeout); } @Override public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { + // Look up remote address upon first call + if (isa == null) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + isa = Address.toSocketAddress(addr); + if (isa.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + isa = null; + throw new ServiceException(new UnknownHostException(addr + " could not be resolved")); + } + } return rpcClient.callBlockingMethod(md, configureRpcController(controller), - param, returnType, ticket, addr); + param, returnType, ticket, isa); } } @@ -584,20 +602,35 @@ public abstract class AbstractRpcClient implements RpcC public static class RpcChannelImplementation extends AbstractRpcChannel implements RpcChannel { - protected RpcChannelImplementation(AbstractRpcClient rpcClient, InetSocketAddress addr, - User ticket, int rpcTimeout) throws UnknownHostException { + protected RpcChannelImplementation(AbstractRpcClient rpcClient, Address addr, + User ticket, int rpcTimeout) { super(rpcClient, addr, ticket, rpcTimeout); } @Override public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType, RpcCallback done) { + HBaseRpcController configuredController = + configureRpcController(Preconditions.checkNotNull(controller, + "RpcController can not be null for async rpc call")); + // Look up remote address upon first call + if (isa == null || isa.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + isa = Address.toSocketAddress(addr); + if (isa.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + isa = null; + controller.setFailed(addr + " could not be resolved"); + return; + } + } // This method does not throw any exceptions, so the caller must provide a // HBaseRpcController which is used to pass the exceptions. - this.rpcClient.callMethod(md, - configureRpcController(Preconditions.checkNotNull(controller, - "RpcController can not be null for async rpc call")), - param, returnType, ticket, addr, done); + this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index 6d4babee745..ce2bd11f960 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; import java.net.UnknownHostException; @@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; @@ -207,7 +209,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { */ public void cleanup(IOException e) { IOException ie = new ConnectionClosingException( - "Connection to " + remoteId.address + " is closing."); + "Connection to " + remoteId.getAddress() + " is closing."); for (Call call : callsToWrite) { call.setException(ie); } @@ -217,12 +219,9 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, - rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); + rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, + rpcClient.metrics); this.rpcClient = rpcClient; - if (remoteId.getAddress().isUnresolved()) { - throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); - } - this.connectionHeaderPreamble = getConnectionHeaderPreamble(); ConnectionHeader header = getConnectionHeader(); ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize()); @@ -257,7 +256,17 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (this.rpcClient.localAddr != null) { this.socket.bind(this.rpcClient.localAddr); } - NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO); + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); + if (remoteAddr.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } + NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO); this.socket.setSoTimeout(this.rpcClient.readTO); return; } catch (SocketTimeoutException toe) { @@ -362,8 +371,18 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { private boolean setupSaslConnection(final InputStream in2, final OutputStream out2) throws IOException { + if (this.metrics != null) { + this.metrics.incrNsLookups(); + } + InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress()); + if (serverAddr.isUnresolved()) { + if (this.metrics != null) { + this.metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, - serverAddress, securityInfo, this.rpcClient.fallbackAllowed, + serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); @@ -440,16 +459,16 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) { if (LOG.isDebugEnabled()) { - LOG.debug("Not trying to connect to " + remoteId.address + LOG.debug("Not trying to connect to " + remoteId.getAddress() + " this server is in the failed servers list"); } throw new FailedServerException( - "This server is in the failed servers list: " + remoteId.address); + "This server is in the failed servers list: " + remoteId.getAddress()); } try { if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to " + remoteId.address); + LOG.debug("Connecting to " + remoteId.getAddress()); } short numRetries = 0; @@ -504,14 +523,14 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { closeSocket(); IOException e = ExceptionUtil.asInterrupt(t); if (e == null) { - this.rpcClient.failedServers.addToFailedServers(remoteId.address, t); + this.rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), t); if (t instanceof LinkageError) { // probably the hbase hadoop version does not match the running hadoop version e = new DoNotRetryIOException(t); } else if (t instanceof IOException) { e = (IOException) t; } else { - e = new IOException("Could not set up IO Streams to " + remoteId.address, t); + e = new IOException("Could not set up IO Streams to " + remoteId.getAddress(), t); } } throw e; @@ -768,7 +787,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (callSender != null) { callSender.interrupt(); } - closeConn(new IOException("connection to " + remoteId.address + " closed")); + closeConn(new IOException("connection to " + remoteId.getAddress() + " closed")); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java index 1396f1e7abc..cac9ff27382 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hbase.ipc; -import java.net.InetSocketAddress; import java.util.Objects; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; @@ -32,9 +32,9 @@ class ConnectionId { private static final int PRIME = 16777619; final User ticket; final String serviceName; - final InetSocketAddress address; + final Address address; - public ConnectionId(User ticket, String serviceName, InetSocketAddress address) { + public ConnectionId(User ticket, String serviceName, Address address) { this.address = address; this.ticket = ticket; this.serviceName = serviceName; @@ -44,7 +44,7 @@ class ConnectionId { return this.serviceName; } - public InetSocketAddress getAddress() { + public Address getAddress() { return address; } @@ -73,7 +73,7 @@ class ConnectionId { return hashCode(ticket,serviceName,address); } - public static int hashCode(User ticket, String serviceName, InetSocketAddress address) { + public static int hashCode(User ticket, String serviceName, Address address) { return (address.hashCode() + PRIME * (PRIME * serviceName.hashCode() ^ (ticket == null ? 0 : ticket.hashCode()))); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java index 86b763b91b0..1a8bc0129ea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; @@ -25,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; */ @InterfaceAudience.Private public class FailedServers { - private final Map failedServers = new HashMap(); + private final Map failedServers = new HashMap(); private long latestExpiry = 0; private final int recheckServersTimeout; private static final Logger LOG = LoggerFactory.getLogger(FailedServers.class); @@ -45,13 +45,13 @@ public class FailedServers { /** * Add an address to the list of the failed servers list. */ - public synchronized void addToFailedServers(InetSocketAddress address, Throwable throwable) { + public synchronized void addToFailedServers(Address address, Throwable throwable) { final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout; - this.failedServers.put(address.toString(), expiry); + this.failedServers.put(address, expiry); this.latestExpiry = expiry; if (LOG.isDebugEnabled()) { LOG.debug( - "Added failed server with address " + address.toString() + " to list caused by " + "Added failed server with address " + address + " to list caused by " + throwable.toString()); } } @@ -61,7 +61,7 @@ public class FailedServers { * * @return true if the server is in the failed servers list */ - public synchronized boolean isFailedServer(final InetSocketAddress address) { + public synchronized boolean isFailedServer(final Address address) { if (failedServers.isEmpty()) { return false; } @@ -70,15 +70,14 @@ public class FailedServers { failedServers.clear(); return false; } - String key = address.toString(); - Long expiry = this.failedServers.get(key); + Long expiry = this.failedServers.get(address); if (expiry == null) { return false; } if (expiry >= now) { return true; } else { - this.failedServers.remove(key); + this.failedServers.remove(address); } return false; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 1b1411c7b95..c952f738446 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.net.ConnectException; -import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.nio.channels.ClosedChannelException; import java.util.concurrent.TimeoutException; @@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; @@ -175,7 +175,7 @@ class IPCUtil { * @return an exception to throw * @see ClientExceptionsUtil#isConnectionException(Throwable) */ - static IOException wrapException(InetSocketAddress addr, Throwable error) { + static IOException wrapException(Address addr, Throwable error) { if (error instanceof ConnectException) { // connection refused; include the host:port in the error return (IOException) new ConnectException( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index fc9f9793021..609d2c12cea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -24,12 +24,15 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; import org.apache.hadoop.hbase.security.SaslChallengeDecoder; @@ -97,7 +100,8 @@ class NettyRpcConnection extends RpcConnection { NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, - rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); + rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, + rpcClient.metrics); this.rpcClient = rpcClient; this.eventLoop = rpcClient.group.next(); byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); @@ -206,8 +210,18 @@ class NettyRpcConnection extends RpcConnection { Promise saslPromise = ch.eventLoop().newPromise(); final NettyHBaseSaslRpcClientHandler saslHandler; try { + if (this.metrics != null) { + this.metrics.incrNsLookups(); + } + InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress()); + if (serverAddr.isUnresolved()) { + if (this.metrics != null) { + this.metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, - serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf); + serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf); } catch (IOException e) { failInit(ch, e); return; @@ -265,23 +279,32 @@ class NettyRpcConnection extends RpcConnection { }); } - private void connect() { + private void connect() throws UnknownHostException { assert eventLoop.inEventLoop(); - LOG.trace("Connecting to {}", remoteId.address); - + LOG.trace("Connecting to {}", remoteId.getAddress()); + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); + if (remoteAddr.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) - .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { + .remoteAddress(remoteAddr).connect().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Channel ch = future.channel(); if (!future.isSuccess()) { failInit(ch, toIOE(future.cause())); - rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause()); + rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), future.cause()); return; } ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 877d9b0d5b9..5bb08152d30 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.ipc; import java.io.Closeable; -import java.io.IOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; @@ -64,10 +63,8 @@ public interface RpcClient extends Closeable { * @param rpcTimeout default rpc operation timeout * * @return A blocking rpc channel that goes via this rpc client instance. - * @throws IOException when channel could not be created */ - BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) - throws IOException; + BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout); /** * Creates a "channel" that can be used by a protobuf service. Useful setting up @@ -79,8 +76,7 @@ public interface RpcClient extends Closeable { * * @return A rpc channel that goes via this rpc client instance. */ - RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) - throws IOException; + RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout); /** * Interrupt the connections to the given server. This should be called if the server diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 195a16d16d3..6749efe66f3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -18,12 +18,11 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; @@ -59,8 +58,6 @@ abstract class RpcConnection { protected final Token token; - protected final InetAddress serverAddress; - protected final SecurityInfo securityInfo; protected final int reloginMaxBackoff; // max pause before relogin on sasl failure @@ -69,6 +66,8 @@ abstract class RpcConnection { protected final CompressionCodec compressor; + protected final MetricsConnection metrics; + protected final HashedWheelTimer timeoutTimer; protected final Configuration conf; @@ -83,17 +82,13 @@ abstract class RpcConnection { protected SaslClientAuthenticationProvider provider; protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, - String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor) - throws IOException { - if (remoteId.getAddress().isUnresolved()) { - throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); - } - this.serverAddress = remoteId.getAddress().getAddress(); + String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor, + MetricsConnection metrics) throws IOException { this.timeoutTimer = timeoutTimer; this.codec = codec; this.compressor = compressor; this.conf = conf; - + this.metrics = metrics; User ticket = remoteId.getTicket(); this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName()); this.useSasl = isSecurityEnabled; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java index 135c78d6674..eae9886ca55 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.ipc; import java.net.InetSocketAddress; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; /** @@ -29,7 +30,14 @@ import org.apache.yetus.audience.InterfaceAudience; @SuppressWarnings("serial") @InterfaceAudience.Public public class ServerTooBusyException extends DoNotRetryIOException { + + public ServerTooBusyException(Address address, long count) { + super("Busy Server! " + count + " concurrent RPCs against " + address); + } + + @Deprecated public ServerTooBusyException(InetSocketAddress address, long count) { super("Busy Server! " + count + " concurrent RPCs against " + address); } + } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java index 0af01984218..40a38c706a1 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java @@ -90,14 +90,12 @@ public class TestMasterRegistryHedgedReads { } @Override - public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) - throws IOException { + public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) { throw new UnsupportedOperationException(); } @Override - public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) - throws IOException { + public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) { return new RpcChannelImpl(); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionId.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionId.java index 3e10f7409c5..48a079d3e75 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionId.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionId.java @@ -22,10 +22,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; -import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -43,7 +43,7 @@ public class TestConnectionId { private User testUser1 = User.createUserForTesting(testConfig, "test", new String[]{"testgroup"}); private User testUser2 = User.createUserForTesting(testConfig, "test", new String[]{"testgroup"}); private String serviceName = "test"; - private InetSocketAddress address = new InetSocketAddress(999); + private Address address = Address.fromParts("localhost", 999); private ConnectionId connectionId1 = new ConnectionId(testUser1, serviceName, address); private ConnectionId connectionId2 = new ConnectionId(testUser2, serviceName, address); @@ -66,7 +66,7 @@ public class TestConnectionId { @Test public void testToString() { - String expectedString = "0.0.0.0/0.0.0.0:999/test/test (auth:SIMPLE)"; + String expectedString = "localhost:999/test/test (auth:SIMPLE)"; assertEquals(expectedString, connectionId1.toString()); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java index eb1877f189d..4036a51f01c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java @@ -21,9 +21,9 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.log4j.Appender; @@ -51,7 +51,7 @@ public class TestFailedServersLog { HBaseClassTestRule.forClass(TestFailedServersLog.class); static final int TEST_PORT = 9999; - private InetSocketAddress addr; + private Address addr; @Mock private Appender mockAppender; @@ -74,7 +74,7 @@ public class TestFailedServersLog { Throwable nullException = new NullPointerException(); FailedServers fs = new FailedServers(new Configuration()); - addr = new InetSocketAddress(TEST_PORT); + addr = Address.fromParts("localhost", TEST_PORT); fs.addToFailedServers(addr, nullException); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 9e1ab2eb9ff..d1443a1068a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -23,7 +23,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -32,6 +31,7 @@ import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.FutureUtils; @@ -100,7 +100,7 @@ public class TestIPCUtil { for (Class clazz : ClientExceptionsUtil.getConnectionExceptionTypes()) { exceptions.add(create(clazz)); } - InetSocketAddress addr = InetSocketAddress.createUnresolved("127.0.0.1", 12345); + Address addr = Address.fromParts("127.0.0.1", 12345); for (Throwable exception : exceptions) { if (exception instanceof TimeoutException) { assertThat(IPCUtil.wrapException(addr, exception), instanceOf(TimeoutIOException.class)); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java index ab75d6011ab..8782fe116b0 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java @@ -26,9 +26,9 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; -import java.net.InetSocketAddress; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -59,7 +59,7 @@ public class TestNettyRpcConnection { public static void setUp() throws IOException { CLIENT = new NettyRpcClient(HBaseConfiguration.create()); CONN = new NettyRpcConnection(CLIENT, - new ConnectionId(User.getCurrent(), "test", new InetSocketAddress("localhost", 1234))); + new ConnectionId(User.getCurrent(), "test", Address.fromParts("localhost", 1234))); } @AfterClass diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java index 48fa522397c..725a3764a36 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.net; +import java.net.InetSocketAddress; + import org.apache.commons.lang3.StringUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -32,7 +34,7 @@ import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort; */ @InterfaceAudience.Public public class Address implements Comparable
{ - private HostAndPort hostAndPort; + private final HostAndPort hostAndPort; private Address(HostAndPort hostAndPort) { this.hostAndPort = hostAndPort; @@ -46,6 +48,33 @@ public class Address implements Comparable
{ return new Address(HostAndPort.fromString(hostnameAndPort)); } + public static Address fromSocketAddress(InetSocketAddress addr) { + return Address.fromParts(addr.getHostString(), addr.getPort()); + } + + public static InetSocketAddress toSocketAddress(Address addr) { + return new InetSocketAddress(addr.getHostName(), addr.getPort()); + } + + public static InetSocketAddress[] toSocketAddress(Address[] addrs) { + if (addrs == null) { + return null; + } + InetSocketAddress[] result = new InetSocketAddress[addrs.length]; + for (int i = 0; i < addrs.length; i++) { + result[i] = toSocketAddress(addrs[i]); + } + return result; + } + + public String getHostName() { + return this.hostAndPort.getHost(); + } + + /** + * @deprecated Use {@link #getHostName()} instead + */ + @Deprecated public String getHostname() { return this.hostAndPort.getHost(); } @@ -65,7 +94,7 @@ public class Address implements Comparable
{ * otherwise returns same as {@link #toString()}} */ public String toStringWithoutDomain() { - String hostname = getHostname(); + String hostname = getHostName(); String [] parts = hostname.split("\\."); if (parts.length > 1) { for (String part: parts) { @@ -86,7 +115,7 @@ public class Address implements Comparable
{ } if (other instanceof Address) { Address that = (Address)other; - return this.getHostname().equals(that.getHostname()) && + return this.getHostName().equals(that.getHostName()) && this.getPort() == that.getPort(); } return false; @@ -94,12 +123,12 @@ public class Address implements Comparable
{ @Override public int hashCode() { - return this.getHostname().hashCode() ^ getPort(); + return this.getHostName().hashCode() ^ getPort(); } @Override public int compareTo(Address that) { - int compare = this.getHostname().compareTo(that.getHostname()); + int compare = this.getHostName().compareTo(that.getHostName()); if (compare != 0) { return compare; } diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index 6ccd138f70d..246d7e0a138 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -100,6 +100,9 @@ public class MemcachedBlockCache implements BlockCache { // case. String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211"); String[] servers = serverListString.split(","); + // MemcachedClient requires InetSocketAddresses, we have to create them now. Implies any + // resolved identities cannot have their address mappings changed while the MemcachedClient + // instance is alive. We won't get a chance to trigger re-resolution. List serverAddresses = new ArrayList<>(servers.length); for (String s:servers) { serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a443e7329e3..67a93a61954 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; @@ -316,14 +317,16 @@ public class HRegionServer extends Thread implements /** * Map of encoded region names to the DataNode locations they should be hosted on - * We store the value as InetSocketAddress since this is used only in HDFS + * We store the value as Address since InetSocketAddress is required by the HDFS * API (create() that takes favored nodes as hints for placing file blocks). * We could have used ServerName here as the value class, but we'd need to * convert it to InetSocketAddress at some point before the HDFS API call, and * it seems a bit weird to store ServerName since ServerName refers to RegionServers - * and here we really mean DataNode locations. + * and here we really mean DataNode locations. We don't store it as InetSocketAddress + * here because the conversion on demand from Address to InetSocketAddress will + * guarantee the resolution results will be fresh when we need it. */ - private final Map regionFavoredNodesMap = new ConcurrentHashMap<>(); + private final Map regionFavoredNodesMap = new ConcurrentHashMap<>(); private LeaseManager leaseManager; @@ -3476,11 +3479,11 @@ public class HRegionServer extends Thread implements @Override public void updateRegionFavoredNodesMapping(String encodedRegionName, List favoredNodes) { - InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()]; + Address[] addr = new Address[favoredNodes.size()]; // Refer to the comment on the declaration of regionFavoredNodesMap on why - // it is a map of region name to InetSocketAddress[] + // it is a map of region name to Address[] for (int i = 0; i < favoredNodes.size(); i++) { - addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(), + addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); } regionFavoredNodesMap.put(encodedRegionName, addr); @@ -3488,13 +3491,14 @@ public class HRegionServer extends Thread implements /** * Return the favored nodes for a region given its encoded name. Look at the - * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[] - * + * comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] + * here. + * @param encodedRegionName * @return array of favored locations */ @Override public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { - return regionFavoredNodesMap.get(encodedRegionName); + return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCIBadHostname.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCIBadHostname.java deleted file mode 100644 index 83d4bfaf4bb..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCIBadHostname.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.fail; - -import java.net.UnknownHostException; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Tests that we fail fast when hostname resolution is not working and do not cache - * unresolved InetSocketAddresses. - */ -@Category({MediumTests.class, ClientTests.class}) -public class TestCIBadHostname { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestCIBadHostname.class); - - private static HBaseTestingUtility testUtil; - private static ConnectionImplementation conn; - - @BeforeClass - public static void setupBeforeClass() throws Exception { - testUtil = HBaseTestingUtility.createLocalHTU(); - testUtil.startMiniCluster(); - conn = (ConnectionImplementation) testUtil.getConnection(); - } - - @AfterClass - public static void teardownAfterClass() throws Exception { - conn.close(); - testUtil.shutdownMiniCluster(); - } - - @Test(expected = UnknownHostException.class) - public void testGetAdminBadHostname() throws Exception { - // verify that we can get an instance with the cluster hostname - ServerName master = testUtil.getHBaseCluster().getMaster().getServerName(); - try { - conn.getAdmin(master); - } catch (UnknownHostException uhe) { - fail("Obtaining admin to the cluster master should have succeeded"); - } - - // test that we fail to get a client to an unresolvable hostname, which - // means it won't be cached - ServerName badHost = - ServerName.valueOf("unknownhost.invalid:" + HConstants.DEFAULT_MASTER_PORT, - System.currentTimeMillis()); - conn.getAdmin(badHost); - fail("Obtaining admin to unresolvable hostname should have failed"); - } - - @Test(expected = UnknownHostException.class) - public void testGetClientBadHostname() throws Exception { - // verify that we can get an instance with the cluster hostname - ServerName rs = testUtil.getHBaseCluster().getRegionServer(0).getServerName(); - try { - conn.getClient(rs); - } catch (UnknownHostException uhe) { - fail("Obtaining client to the cluster regionserver should have succeeded"); - } - - // test that we fail to get a client to an unresolvable hostname, which - // means it won't be cached - ServerName badHost = - ServerName.valueOf("unknownhost.invalid:" + HConstants.DEFAULT_REGIONSERVER_PORT, - System.currentTimeMillis()); - conn.getAdmin(badHost); - fail("Obtaining client to unresolvable hostname should have failed"); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index aa8e10d5111..dbc0da981be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -20,10 +20,8 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; -import java.net.UnknownHostException; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; @@ -38,6 +36,7 @@ import org.apache.hadoop.hbase.ipc.AbstractRpcClient; import org.apache.hadoop.hbase.ipc.BlockingRpcClient; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -147,14 +146,12 @@ public class TestClientTimeouts { // Return my own instance, one that does random timeouts @Override - public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, - User ticket, int rpcTimeout) throws UnknownHostException { + public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) { return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); } @Override - public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) - throws UnknownHostException { + public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) { return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout); } } @@ -170,7 +167,7 @@ public class TestClientTimeouts { RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn, final User ticket, final int rpcTimeout) { - super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); + super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); } @Override @@ -191,8 +188,8 @@ public class TestClientTimeouts { private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation { RandomTimeoutRpcChannel(AbstractRpcClient rpcClient, ServerName sn, User ticket, - int rpcTimeout) throws UnknownHostException { - super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); + int rpcTimeout) { + super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java index ad4741ccf8a..01b840cff66 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hbase.ipc; -import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -42,12 +42,11 @@ public class TestHBaseClient { FailedServers fs = new FailedServers(new Configuration()); Throwable testThrowable = new Throwable();//throwable already tested in TestFailedServers.java - InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12); + Address ia = Address.fromParts("bad", 12); // same server as ia - InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); - InetSocketAddress ia3 = InetSocketAddress.createUnresolved("badtoo", 12); - InetSocketAddress ia4 = InetSocketAddress.createUnresolved("badtoo", 13); - + Address ia2 = Address.fromParts("bad", 12); + Address ia3 = Address.fromParts("badtoo", 12); + Address ia4 = Address.fromParts("badtoo", 13); Assert.assertFalse(fs.isFailedServer(ia)); diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 25320080015..3184a59097a 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -26,6 +26,7 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -2069,10 +2070,12 @@ public final class ZKUtil { int port = sp.length > 1 ? Integer.parseInt(sp[1]) : HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT; - InetSocketAddress sockAddr = new InetSocketAddress(host, port); try (Socket socket = new Socket()) { + InetSocketAddress sockAddr = new InetSocketAddress(host, port); + if (sockAddr.isUnresolved()) { + throw new UnknownHostException(host + " cannot be resolved"); + } socket.connect(sockAddr, timeout); - socket.setSoTimeout(timeout); try (PrintWriter out = new PrintWriter(new BufferedWriter( new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)), true);