diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index d773ffe83ea..879e8e3cbcd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; import com.google.protobuf.BlockingRpcChannel; @@ -27,8 +28,6 @@ import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Date; import java.util.HashSet; @@ -220,7 +219,6 @@ class ConnectionManager { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled"; - private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; // An LRU Map of HConnectionKey -> HConnection (TableServer). All // access must be synchronized. This map is not private because tests @@ -564,7 +562,6 @@ class ConnectionManager { justification="Access to the conncurrent hash map is under a lock so should be fine.") static class HConnectionImplementation implements ClusterConnection, Closeable { static final Log LOG = LogFactory.getLog(HConnectionImplementation.class); - private final boolean hostnamesCanChange; private final long pause; private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified private boolean useMetaReplicas; @@ -706,7 +703,6 @@ class ConnectionManager { } this.metaCache = new MetaCache(this.metrics); - this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); Class listenerClass = @@ -1599,8 +1595,7 @@ class ConnectionManager { throw new MasterNotRunningException(sn + " is dead."); } // Use the security info interface name as our stub key - String key = getStubKey(getServiceName(), - sn.getHostname(), sn.getPort(), hostnamesCanChange); + String key = getStubKey(getServiceName(), sn); connectionLock.putIfAbsent(key, key); Object stub = null; synchronized (connectionLock.get(key)) { @@ -1682,8 +1677,7 @@ class ConnectionManager { if (isDeadServer(serverName)) { throw new RegionServerStoppedException(serverName + " is dead."); } - String key = getStubKey(AdminService.BlockingInterface.class.getName(), - serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange); + String key = getStubKey(AdminService.BlockingInterface.class.getName(), serverName); this.connectionLock.putIfAbsent(key, key); AdminService.BlockingInterface stub = null; synchronized (this.connectionLock.get(key)) { @@ -1704,8 +1698,7 @@ class ConnectionManager { if (isDeadServer(sn)) { throw new RegionServerStoppedException(sn + " is dead."); } - String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(), - sn.getPort(), this.hostnamesCanChange); + String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn); this.connectionLock.putIfAbsent(key, key); ClientService.BlockingInterface stub = null; synchronized (this.connectionLock.get(key)) { @@ -1722,26 +1715,6 @@ class ConnectionManager { return stub; } - static String getStubKey(final String serviceName, - final String rsHostname, - int port, - boolean resolveHostnames) { - - // Sometimes, servers go down and they come back up with the same hostname but a different - // IP address. Force a resolution of the rsHostname by trying to instantiate an - // InetSocketAddress, and this way we will rightfully get a new stubKey. - // Also, include the hostname in the key so as to take care of those cases where the - // DNS name is different but IP address remains the same. - String address = rsHostname; - if (resolveHostnames) { - InetAddress i = new InetSocketAddress(rsHostname, port).getAddress(); - if (i != null) { - address = i.getHostAddress() + "-" + rsHostname; - } - } - return serviceName + "@" + address + ":" + port; - } - private ZooKeeperKeepAliveConnection keepAliveZookeeper; private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0); private boolean canCloseZKW = true; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index d97083dc361..12fdf9644fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -189,6 +189,13 @@ public class ConnectionUtils { } } + /** + * Get a unique key for the rpc stub to the given server. + */ + static String getStubKey(String serviceName, ServerName serverName) { + return String.format("%s@%s", serviceName, serverName); + } + // A byte array in which all elements are the max byte, and it is used to // construct closest front row static final byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 1896df4ac57..d9dd2b6702d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -63,6 +63,8 @@ public class MetricsConnection implements StatisticTrackable { private static final String HEAP_BASE = "heapOccupancy_"; private static final String CACHE_BASE = "cacheDroppingExceptions_"; private static final String UNKNOWN_EXCEPTION = "UnknownException"; + private static final String NS_LOOKUPS = "nsLookups"; + private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed"; private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); /** A container class for collecting details about the RPC call as it percolates. */ @@ -287,6 +289,8 @@ public class MetricsConnection implements StatisticTrackable { protected final Counter hedgedReadOps; protected final Counter hedgedReadWin; protected final Histogram concurrentCallsPerServerHist; + protected final Counter nsLookups; + protected final Counter nsLookupsFailed; // dynamic metrics @@ -359,6 +363,8 @@ public class MetricsConnection implements StatisticTrackable { this.runnerStats = new RunnerStats(this.registry); this.concurrentCallsPerServerHist = registry.newHistogram(this.getClass(), "concurrentCallsPerServer", scope); + this.nsLookups = registry.newCounter(this.getClass(), NS_LOOKUPS, scope); + this.nsLookupsFailed = registry.newCounter(this.getClass(), NS_LOOKUPS_FAILED, scope); this.reporter = new JmxReporter(this.registry); this.reporter.start(); @@ -524,4 +530,12 @@ public class MetricsConnection implements StatisticTrackable { (exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), cacheDroppingExceptions, counterFactory).inc(); } + + public void incrNsLookups() { + this.nsLookups.inc(); + } + + public void incrNsLookupsFailed() { + this.nsLookupsFailed.inc(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 2819e70af9f..3e5cb21db81 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -142,10 +143,10 @@ public abstract class AbstractRpcClient implements RpcC private int maxConcurrentCallsPerServer; - private static final LoadingCache concurrentCounterCache = + private static final LoadingCache concurrentCounterCache = CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS). - build(new CacheLoader() { - @Override public AtomicInteger load(InetSocketAddress key) throws Exception { + build(new CacheLoader() { + @Override public AtomicInteger load(Address key) throws Exception { return new AtomicInteger(0); } }); @@ -213,7 +214,7 @@ public abstract class AbstractRpcClient implements RpcC // have some pending calls on connection so we should not shutdown the connection outside. // The connection itself will disconnect if there is no pending call for maxIdleTime. if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { - LOG.info("Cleanup idle connection to " + conn.remoteId().address); + LOG.info("Cleanup idle connection to " + conn.remoteId().getAddress()); connections.removeValue(conn.remoteId(), conn); conn.cleanupConnection(); } @@ -342,11 +343,11 @@ public abstract class AbstractRpcClient implements RpcC private T getConnection(ConnectionId remoteId) throws IOException { if (failedServers.isFailedServer(remoteId.getAddress())) { if (LOG.isDebugEnabled()) { - LOG.debug("Not trying to connect to " + remoteId.address + LOG.debug("Not trying to connect to " + remoteId.getAddress() + " this server is in the failed servers list"); } throw new FailedServerException( - "This server is in the failed servers list: " + remoteId.address); + "This server is in the failed servers list: " + remoteId.getAddress()); } T conn; synchronized (connections) { @@ -368,7 +369,7 @@ public abstract class AbstractRpcClient implements RpcC */ protected abstract T createConnection(ConnectionId remoteId) throws IOException; - private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr, + private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, RpcCallback callback) { call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); if (metrics != null) { @@ -393,10 +394,11 @@ public abstract class AbstractRpcClient implements RpcC } private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, - final Message param, Message returnType, final User ticket, final InetSocketAddress addr, - final RpcCallback callback) { + final Message param, Message returnType, final User ticket, + final InetSocketAddress inetAddr, final RpcCallback callback) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); cs.setStartTime(EnvironmentEdgeManager.currentTime()); + final Address addr = Address.fromSocketAddress(inetAddr); final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { @@ -420,12 +422,8 @@ public abstract class AbstractRpcClient implements RpcC } } - private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException { - InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort()); - if (addr.isUnresolved()) { - throw new UnknownHostException("can not resolve " + sn.getServerName()); - } - return addr; + private static Address createAddr(ServerName sn) { + return Address.fromParts(sn.getHostname(), sn.getPort()); } /** @@ -440,8 +438,8 @@ public abstract class AbstractRpcClient implements RpcC synchronized (connections) { for (T connection : connections.values()) { ConnectionId remoteId = connection.remoteId(); - if (remoteId.address.getPort() == sn.getPort() && - remoteId.address.getHostName().equals(sn.getHostname())) { + if (remoteId.getAddress().getPort() == sn.getPort() && + remoteId.getAddress().getHostname().equals(sn.getHostname())) { LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " + connection.remoteId); connections.removeValue(remoteId, connection); @@ -500,19 +498,25 @@ public abstract class AbstractRpcClient implements RpcC @Override public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, - int rpcTimeout) throws UnknownHostException { + int rpcTimeout) { return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout); } @Override - public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) - throws UnknownHostException { + public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) { return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); } private static class AbstractRpcChannel { - protected final InetSocketAddress addr; + protected final Address addr; + + // We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup + // per method call on the channel. If the remote target is removed or reprovisioned and + // its identity changes a new channel with a newly resolved InetSocketAddress will be + // created as part of retry, so caching here is fine. + // Normally, caching an InetSocketAddress is an anti-pattern. + protected InetSocketAddress isa; protected final AbstractRpcClient rpcClient; @@ -520,7 +524,7 @@ public abstract class AbstractRpcClient implements RpcC protected final int rpcTimeout; - protected AbstractRpcChannel(AbstractRpcClient rpcClient, InetSocketAddress addr, + protected AbstractRpcChannel(AbstractRpcClient rpcClient, Address addr, User ticket, int rpcTimeout) { this.addr = addr; this.rpcClient = rpcClient; @@ -557,15 +561,29 @@ public abstract class AbstractRpcClient implements RpcC implements BlockingRpcChannel { protected BlockingRpcChannelImplementation(AbstractRpcClient rpcClient, - InetSocketAddress addr, User ticket, int rpcTimeout) { + Address addr, User ticket, int rpcTimeout) { super(rpcClient, addr, ticket, rpcTimeout); } @Override public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { + // Look up remote address upon first call + if (isa == null) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + isa = Address.toSocketAddress(addr); + if (isa.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + isa = null; + throw new ServiceException(new UnknownHostException(addr + " could not be resolved")); + } + } return rpcClient.callBlockingMethod(md, configureRpcController(controller), - param, returnType, ticket, addr); + param, returnType, ticket, isa); } } @@ -575,20 +593,35 @@ public abstract class AbstractRpcClient implements RpcC public static class RpcChannelImplementation extends AbstractRpcChannel implements RpcChannel { - protected RpcChannelImplementation(AbstractRpcClient rpcClient, InetSocketAddress addr, - User ticket, int rpcTimeout) throws UnknownHostException { + protected RpcChannelImplementation(AbstractRpcClient rpcClient, Address addr, + User ticket, int rpcTimeout) { super(rpcClient, addr, ticket, rpcTimeout); } @Override public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType, RpcCallback done) { + HBaseRpcController configuredController = + configureRpcController(Preconditions.checkNotNull(controller, + "RpcController can not be null for async rpc call")); + // Look up remote address upon first call + if (isa == null || isa.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + isa = Address.toSocketAddress(addr); + if (isa.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + isa = null; + controller.setFailed(addr + " could not be resolved"); + return; + } + } // This method does not throw any exceptions, so the caller must provide a // HBaseRpcController which is used to pass the exceptions. - this.rpcClient.callMethod(md, - configureRpcController(Preconditions.checkNotNull(controller, - "RpcController can not be null for async rpc call")), - param, returnType, ticket, addr, done); + this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done); } } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index a62bd18e0da..06f9bf2cf69 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; import java.net.UnknownHostException; @@ -59,6 +60,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; @@ -203,7 +205,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { */ public void cleanup(IOException e) { IOException ie = - new ConnectionClosingException("Connection to " + remoteId.address + " is closing."); + new ConnectionClosingException("Connection to " + remoteId.getAddress() + " is closing."); for (Call call : callsToWrite) { call.setException(ie); } @@ -213,12 +215,9 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, - rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); + rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, + rpcClient.metrics); this.rpcClient = rpcClient; - if (remoteId.getAddress().isUnresolved()) { - throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); - } - this.connectionHeaderPreamble = getConnectionHeaderPreamble(); ConnectionHeader header = getConnectionHeader(); ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize()); @@ -253,7 +252,17 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (this.rpcClient.localAddr != null) { this.socket.bind(this.rpcClient.localAddr); } - NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO); + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); + if (remoteAddr.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } + NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO); this.socket.setSoTimeout(this.rpcClient.readTO); return; } catch (SocketTimeoutException toe) { @@ -419,16 +428,16 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) { if (LOG.isDebugEnabled()) { - LOG.debug("Not trying to connect to " + remoteId.address + + LOG.debug("Not trying to connect to " + remoteId.getAddress() + " this server is in the failed servers list"); } throw new FailedServerException( - "This server is in the failed servers list: " + remoteId.address); + "This server is in the failed servers list: " + remoteId.getAddress()); } try { if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to " + remoteId.address); + LOG.debug("Connecting to " + remoteId.getAddress()); } short numRetries = 0; @@ -480,14 +489,14 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { closeSocket(); IOException e = ExceptionUtil.asInterrupt(t); if (e == null) { - this.rpcClient.failedServers.addToFailedServers(remoteId.address, t); + this.rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), t); if (t instanceof LinkageError) { // probably the hbase hadoop version does not match the running hadoop version e = new DoNotRetryIOException(t); } else if (t instanceof IOException) { e = (IOException) t; } else { - e = new IOException("Could not set up IO Streams to " + remoteId.address, t); + e = new IOException("Could not set up IO Streams to " + remoteId.getAddress(), t); } } throw e; @@ -685,7 +694,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (callSender != null) { callSender.interrupt(); } - closeConn(new IOException("connection to " + remoteId.address + " closed")); + closeConn(new IOException("connection to " + remoteId.getAddress() + " closed")); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java index cf84c5a4532..3d03762cbb6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java @@ -18,10 +18,9 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; -import java.net.InetSocketAddress; - /** * This class holds the address and the user ticket, etc. The client connections * to servers are uniquely identified by <remoteAddress, ticket, serviceName> @@ -31,9 +30,9 @@ public class ConnectionId { private static final int PRIME = 16777619; final User ticket; final String serviceName; - final InetSocketAddress address; + final Address address; - public ConnectionId(User ticket, String serviceName, InetSocketAddress address) { + public ConnectionId(User ticket, String serviceName, Address address) { this.address = address; this.ticket = ticket; this.serviceName = serviceName; @@ -43,7 +42,7 @@ public class ConnectionId { return this.serviceName; } - public InetSocketAddress getAddress() { + public Address getAddress() { return address; } @@ -53,7 +52,7 @@ public class ConnectionId { @Override public String toString() { - return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; + return this.address + "/" + this.serviceName + "/" + this.ticket; } @Override @@ -102,7 +101,7 @@ public class ConnectionId { return true; } - public static int hashCode(User ticket, String serviceName, InetSocketAddress address){ + public static int hashCode(User ticket, String serviceName, Address address){ return (address.hashCode() + PRIME * (PRIME * serviceName.hashCode() ^ (ticket == null ? 0 : ticket.hashCode()))); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java index 2ba5a735c88..e53c36634b4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java @@ -17,16 +17,14 @@ */ package org.apache.hadoop.hbase.ipc; -import java.net.InetSocketAddress; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -35,8 +33,8 @@ import org.apache.hadoop.hbase.util.Pair; */ @InterfaceAudience.Private public class FailedServers { - private final LinkedList> failedServers = new - LinkedList>(); + private final LinkedList> failedServers = new + LinkedList>(); private final int recheckServersTimeout; private static final Log LOG = LogFactory.getLog(FailedServers.class); @@ -48,12 +46,12 @@ public class FailedServers { /** * Add an address to the list of the failed servers list. */ - public synchronized void addToFailedServers(InetSocketAddress address, Throwable throwable) { + public synchronized void addToFailedServers(Address address, Throwable throwable) { final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout; - failedServers.addFirst(new Pair(expiry, address.toString())); + failedServers.addFirst(new Pair(expiry, address)); if (LOG.isDebugEnabled()) { LOG.debug( - "Added failed server with address " + address.toString() + " to list caused by " + "Added failed server with address " + address + " to list caused by " + throwable.toString()); } } @@ -63,22 +61,21 @@ public class FailedServers { * * @return true if the server is in the failed servers list */ - public synchronized boolean isFailedServer(final InetSocketAddress address) { + public synchronized boolean isFailedServer(final Address address) { if (failedServers.isEmpty()) { return false; } - final String lookup = address.toString(); final long now = EnvironmentEdgeManager.currentTime(); // iterate, looking for the search entry and cleaning expired entries - Iterator> it = failedServers.iterator(); + Iterator> it = failedServers.iterator(); while (it.hasNext()) { - Pair cur = it.next(); + Pair cur = it.next(); if (cur.getFirst() < now) { it.remove(); } else { - if (lookup.equals(cur.getSecond())) { + if (address.equals(cur.getSecond())) { return true; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index b5cb76d2c84..c599328e5e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -24,7 +24,6 @@ import com.google.protobuf.Message; import java.io.IOException; import java.io.OutputStream; import java.net.ConnectException; -import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; @@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; @@ -159,7 +159,7 @@ class IPCUtil { * @param exception the relevant exception * @return an exception to throw */ - static IOException wrapException(InetSocketAddress addr, Exception exception) { + static IOException wrapException(Address addr, Exception exception) { if (exception instanceof ConnectException) { // connection refused; include the host:port in the error return (ConnectException) new ConnectException( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index b5fb7e41475..fa68c18680e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -42,6 +42,8 @@ import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -52,6 +54,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; import org.apache.hadoop.hbase.security.SaslChallengeDecoder; @@ -85,7 +88,8 @@ class NettyRpcConnection extends RpcConnection { NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, - rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); + rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, + rpcClient.metrics); this.rpcClient = rpcClient; byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); this.connectionHeaderPreamble = @@ -216,24 +220,33 @@ class NettyRpcConnection extends RpcConnection { }); } - private void connect() { + private void connect() throws UnknownHostException { if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to " + remoteId.address); + LOG.debug("Connecting to " + remoteId.getAddress()); + } + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookups(); + } + InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); + if (remoteAddr.isUnresolved()) { + if (this.rpcClient.metrics != null) { + this.rpcClient.metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); } - this.channel = new Bootstrap().group(rpcClient.group).channel(rpcClient.channelClass) .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) - .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { + .remoteAddress(remoteAddr).connect().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Channel ch = future.channel(); if (!future.isSuccess()) { failInit(ch, toIOE(future.cause())); - rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause()); + rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), future.cause()); return; } ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 26a5739f200..8c5bb6e30be 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -21,7 +21,6 @@ import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.RpcChannel; import java.io.Closeable; -import java.io.IOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -65,10 +64,8 @@ public interface RpcClient extends Closeable { * @param rpcTimeout default rpc operation timeout * * @return A blocking rpc channel that goes via this rpc client instance. - * @throws IOException when channel could not be created */ - BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) - throws IOException; + BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout); /** * Creates a "channel" that can be used by a protobuf service. Useful setting up @@ -80,8 +77,7 @@ public interface RpcClient extends Closeable { * * @return A rpc channel that goes via this rpc client instance. */ - RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) - throws IOException; + RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout); /** * Interrupt the connections to the given server. This should be called if the server diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index d39013bd270..bac018b2b2d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -24,6 +24,7 @@ import io.netty.util.TimerTask; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; @@ -32,7 +33,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; @@ -73,6 +76,8 @@ abstract class RpcConnection { protected final CompressionCodec compressor; + protected final MetricsConnection metrics; + protected final HashedWheelTimer timeoutTimer; // the last time we were picked up from connection pool. @@ -87,15 +92,12 @@ abstract class RpcConnection { private long lastForceReloginAttempt = -1; protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, - String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor) - throws IOException { - if (remoteId.getAddress().isUnresolved()) { - throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); - } + String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor, + MetricsConnection metrics) throws IOException { this.timeoutTimer = timeoutTimer; this.codec = codec; this.compressor = compressor; - + this.metrics = metrics; UserGroupInformation ticket = remoteId.getTicket().getUGI(); SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName()); this.useSasl = isSecurityEnabled; @@ -116,8 +118,18 @@ abstract class RpcConnection { if (serverKey == null) { throw new IOException("Can't obtain server Kerberos config key from SecurityInfo"); } + if (metrics != null) { + metrics.incrNsLookups(); + } + InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); + if (remoteAddr.isUnresolved()) { + if (metrics != null) { + metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } serverPrincipal = SecurityUtil.getServerPrincipal(conf.get(serverKey), - remoteId.address.getAddress().getCanonicalHostName().toLowerCase()); + remoteAddr.getAddress().getCanonicalHostName().toLowerCase()); if (LOG.isDebugEnabled()) { LOG.debug("RPC Server Kerberos principal name for service=" + remoteId.getServiceName() + " is " + serverPrincipal); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java index 7ee2a3b68ba..1b209bd8833 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.net.Address; /** * Throw this in rpc call if there are too many pending requests for one region server @@ -31,6 +32,13 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceStability.Evolving public class ServerTooBusyException extends DoNotRetryIOException { + private static final long serialVersionUID = 1L; + + public ServerTooBusyException(Address address, long count){ + super("There are "+count+" concurrent rpc requests for "+address); + } + + @Deprecated public ServerTooBusyException(InetSocketAddress address, long count){ super("There are "+count+" concurrent rpc requests for "+address); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 1181bc8eb28..f20cad05162 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -26,6 +26,7 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -2090,28 +2091,30 @@ public class ZKUtil { int port = sp.length > 1 ? Integer.parseInt(sp[1]) : HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT; - Socket socket = new Socket(); - InetSocketAddress sockAddr = new InetSocketAddress(host, port); - socket.connect(sockAddr, timeout); - - socket.setSoTimeout(timeout); - PrintWriter out = new PrintWriter(new BufferedWriter( - new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)), - true); - BufferedReader in = new BufferedReader( - new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); - out.println("stat"); - out.flush(); ArrayList res = new ArrayList(); - while (true) { - String line = in.readLine(); - if (line != null) { - res.add(line); - } else { - break; + try (Socket socket = new Socket()) { + InetSocketAddress sockAddr = new InetSocketAddress(host, port); + if (sockAddr.isUnresolved()) { + throw new UnknownHostException(host + " cannot be resolved"); + } + socket.connect(sockAddr, timeout); + socket.setSoTimeout(timeout); + PrintWriter out = new PrintWriter(new BufferedWriter( + new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)), + true); + BufferedReader in = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); + out.println("stat"); + out.flush(); + while (true) { + String line = in.readLine(); + if (line != null) { + res.add(line); + } else { + break; + } } } - socket.close(); return res.toArray(new String[res.size()]); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java index 03e3ca5039f..cdbae6a2596 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestFailedServersLog.java @@ -15,9 +15,8 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import java.net.InetSocketAddress; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.log4j.Appender; @@ -39,7 +38,7 @@ import org.mockito.runners.MockitoJUnitRunner; @Category({ ClientTests.class, SmallTests.class }) public class TestFailedServersLog { static final int TEST_PORT = 9999; - private InetSocketAddress addr; + private Address addr; @Mock private Appender mockAppender; @@ -62,14 +61,14 @@ public class TestFailedServersLog { Throwable nullException = new NullPointerException(); FailedServers fs = new FailedServers(new Configuration()); - addr = new InetSocketAddress(TEST_PORT); + addr = Address.fromParts("localhost", TEST_PORT); fs.addToFailedServers(addr, nullException); Mockito.verify(mockAppender).doAppend((LoggingEvent) captorLoggingEvent.capture()); LoggingEvent loggingEvent = (LoggingEvent) captorLoggingEvent.getValue(); assertThat(loggingEvent.getLevel(), is(Level.DEBUG)); - assertEquals("Added failed server with address " + addr.toString() + " to list caused by " + assertEquals("Added failed server with address " + addr + " to list caused by " + nullException.toString(), loggingEvent.getRenderedMessage()); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 7c4ac029acd..eca6f8d075e 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -21,10 +21,10 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; import static org.junit.Assert.assertTrue; import java.net.ConnectException; -import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; @@ -35,7 +35,7 @@ public class TestIPCUtil { @Test public void testWrapException() throws Exception { - final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0); + final Address address = Address.fromParts("localhost", 0); assertTrue(wrapException(address, new ConnectException()) instanceof ConnectException); assertTrue( wrapException(address, new SocketTimeoutException()) instanceof SocketTimeoutException); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java index e6ee60b40b8..6f9fd3ff7f0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.net; +import java.net.InetSocketAddress; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -33,7 +35,7 @@ import com.google.common.net.HostAndPort; @InterfaceAudience.Public @InterfaceStability.Stable public class Address implements Comparable
{ - private HostAndPort hostAndPort; + private final HostAndPort hostAndPort; private Address(HostAndPort hostAndPort) { this.hostAndPort = hostAndPort; @@ -47,6 +49,33 @@ public class Address implements Comparable
{ return new Address(HostAndPort.fromString(hostnameAndPort)); } + public static Address fromSocketAddress(InetSocketAddress addr) { + return Address.fromParts(addr.getHostString(), addr.getPort()); + } + + public static InetSocketAddress toSocketAddress(Address addr) { + return new InetSocketAddress(addr.getHostName(), addr.getPort()); + } + + public static InetSocketAddress[] toSocketAddress(Address[] addrs) { + if (addrs == null) { + return null; + } + InetSocketAddress[] result = new InetSocketAddress[addrs.length]; + for (int i = 0; i < addrs.length; i++) { + result[i] = toSocketAddress(addrs[i]); + } + return result; + } + + public String getHostName() { + return this.hostAndPort.getHostText(); + } + + /** + * @deprecated Use {@link #getHostName()} instead + */ + @Deprecated public String getHostname() { return this.hostAndPort.getHostText(); } @@ -69,7 +98,7 @@ public class Address implements Comparable
{ } if (other instanceof Address) { Address that = (Address)other; - return this.getHostname().equals(that.getHostname()) && + return this.getHostName().equals(that.getHostName()) && this.getPort() == that.getPort(); } return false; @@ -77,12 +106,12 @@ public class Address implements Comparable
{ @Override public int hashCode() { - return this.getHostname().hashCode() ^ getPort(); + return this.getHostName().hashCode() ^ getPort(); } @Override public int compareTo(Address that) { - int compare = this.getHostname().compareTo(that.getHostname()); + int compare = this.getHostName().compareTo(that.getHostName()); if (compare != 0) { return compare; } diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index f4e9621d206..afdf3eb3d7f 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -96,6 +96,9 @@ public class MemcachedBlockCache implements BlockCache { // case. String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211"); String[] servers = serverListString.split(","); + // MemcachedClient requires InetSocketAddresses, we have to create them now. Implies any + // resolved identities cannot have their address mappings changed while the MemcachedClient + // instance is alive. We won't get a chance to trigger re-resolution. List serverAddresses = new ArrayList(servers.length); for (String s:servers) { serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6f3d4626a86..531038c3722 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -36,6 +36,7 @@ import java.lang.reflect.Constructor; import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.text.MessageFormat; import java.util.ArrayList; @@ -126,6 +127,7 @@ import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -283,15 +285,17 @@ public class HRegionServer extends HasThread implements /** * Map of encoded region names to the DataNode locations they should be hosted on - * We store the value as InetSocketAddress since this is used only in HDFS + * We store the value as Address since InetSocketAddress is required by the HDFS * API (create() that takes favored nodes as hints for placing file blocks). * We could have used ServerName here as the value class, but we'd need to * convert it to InetSocketAddress at some point before the HDFS API call, and * it seems a bit weird to store ServerName since ServerName refers to RegionServers - * and here we really mean DataNode locations. + * and here we really mean DataNode locations. We don't store it as InetSocketAddress + * here because the conversion on demand from Address to InetSocketAddress will + * guarantee the resolution results will be fresh when we need it. */ - protected final Map regionFavoredNodesMap = - new ConcurrentHashMap(); + protected final Map regionFavoredNodesMap = + new ConcurrentHashMap(); /** * Set of regions currently being in recovering state which means it can accept writes(edits from @@ -3169,8 +3173,17 @@ public class HRegionServer extends HasThread implements return this.onlineRegions.get(encodedRegionName); } - public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) { - return this.regionFavoredNodesMap.get(encodedRegionName); + /** + * Return the favored nodes for a region given its encoded name. Look at the + * comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] + * here. + * @param encodedRegionName + * @return array of favored locations + * @throws UnknownHostException if a favored node location cannot be resolved + */ + public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) + throws UnknownHostException { + return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); } @Override @@ -3178,7 +3191,6 @@ public class HRegionServer extends HasThread implements return this.onlineRegions.get(encodedRegionName); } - @Override public boolean removeFromOnlineRegions(final Region r, ServerName destination) { Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); @@ -3296,11 +3308,11 @@ public class HRegionServer extends HasThread implements @Override public void updateRegionFavoredNodesMapping(String encodedRegionName, List favoredNodes) { - InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()]; + Address[] addr = new Address[favoredNodes.size()]; // Refer to the comment on the declaration of regionFavoredNodesMap on why - // it is a map of region name to InetSocketAddress[] + // it is a map of region name to Address[] for (int i = 0; i < favoredNodes.size(); i++) { - addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(), + addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); } regionFavoredNodesMap.put(encodedRegionName, addr); @@ -3308,13 +3320,14 @@ public class HRegionServer extends HasThread implements /** * Return the favored nodes for a region given its encoded name. Look at the - * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[] + * comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] + * here. * @param encodedRegionName * @return array of favored locations */ @Override public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { - return regionFavoredNodesMap.get(encodedRegionName); + return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 711f520dca1..042ca83fee7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -28,10 +28,8 @@ import com.google.protobuf.Message; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; -import java.net.UnknownHostException; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +42,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.AbstractRpcClient; import org.apache.hadoop.hbase.ipc.BlockingRpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -139,8 +138,7 @@ public class TestClientTimeouts { // Return my own instance, one that does random timeouts @Override - public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, - User ticket, int rpcTimeout) throws UnknownHostException { + public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) { return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); } } @@ -156,7 +154,7 @@ public class TestClientTimeouts { RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn, final User ticket, final int rpcTimeout) { - super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); + super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java index d6f5c52b83c..29ba53585cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; @@ -26,8 +27,6 @@ import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.net.InetSocketAddress; - @Category(MediumTests.class) // Can't be small, we're playing with the EnvironmentEdge public class TestHBaseClient { @@ -38,11 +37,10 @@ public class TestHBaseClient { FailedServers fs = new FailedServers(new Configuration()); Throwable testThrowable = new Throwable();//throwable already tested in TestFailedServers.java - InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12); - InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); // same server as ia - InetSocketAddress ia3 = InetSocketAddress.createUnresolved("badtoo", 12); - InetSocketAddress ia4 = InetSocketAddress.createUnresolved("badtoo", 13); - + Address ia = Address.fromParts("bad", 12); + Address ia2 = Address.fromParts("bad", 12); // same server as ia + Address ia3 = Address.fromParts("badtoo", 12); + Address ia4 = Address.fromParts("badtoo", 13); Assert.assertFalse( fs.isFailedServer(ia) );