diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 77bf8d0b0e3..2c4d62ead8f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -187,6 +187,9 @@ public final class HConstants { /** Parameter name for what master implementation to use. */ public static final String MASTER_IMPL= "hbase.master.impl"; + /** Parameter name for what hbase client implementation to use. */ + public static final String HBASECLIENT_IMPL= "hbase.hbaseclient.impl"; + /** Parameter name for how often threads should wake up */ public static final String THREAD_WAKE_FREQUENCY = "hbase.server.thread.wakefrequency"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java index 051446f59c8..14c7a9025d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hbase.ipc; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.io.Writable; @@ -32,7 +35,7 @@ import org.apache.hadoop.io.Writable; * Enables reuse/sharing of clients on a per SocketFactory basis. A client * establishes certain configuration dependent characteristics like timeouts, * tcp-keepalive (true or false), etc. For more details on the characteristics, - * look at {@link HBaseClient#HBaseClient(Class, Configuration, SocketFactory)} + * look at {@link HBaseClient#HBaseClient(Configuration, SocketFactory)} * Creation of dynamic proxies to protocols creates the clients (and increments * reference count once created), and stopping of the proxies leads to clearing * out references and when the reference drops to zero, the cache mapping is @@ -52,12 +55,29 @@ class ClientCache { * @param factory socket factory * @return an IPC client */ - protected synchronized HBaseClient getClient(Configuration conf, - SocketFactory factory) { + @SuppressWarnings("unchecked") + protected synchronized HBaseClient getClient(Configuration conf, SocketFactory factory) { + HBaseClient client = clients.get(factory); if (client == null) { + Class hbaseClientClass = (Class) conf + .getClass(HConstants.HBASECLIENT_IMPL, HBaseClient.class); + // Make an hbase client instead of hadoop Client. - client = new HBaseClient(conf, factory); + try { + Constructor cst = hbaseClientClass.getConstructor( + Configuration.class, SocketFactory.class); + client = cst.newInstance(conf, factory); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (NoSuchMethodException e) { + throw new RuntimeException("No matching constructor in "+hbaseClientClass.getName(), e); + } + clients.put(factory, client); } else { client.incCount(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 9c55d97f5f8..f8a0315972f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -38,6 +38,7 @@ import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.Random; @@ -69,6 +70,8 @@ import org.apache.hadoop.hbase.security.TokenInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; import org.apache.hadoop.io.IOUtils; @@ -113,6 +116,7 @@ public class HBaseClient { protected final boolean tcpKeepAlive; // if T then use keepalives protected int pingInterval; // how often sends ping to the server in msecs protected int socketTimeout; // socket timeout + protected FailedServers failedServers; protected final SocketFactory socketFactory; // how to create sockets private int refCount = 1; @@ -124,6 +128,68 @@ public class HBaseClient { final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds final static int PING_CALL_ID = -1; + public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; + public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000; + + /** + * A class to manage a list of servers that failed recently. + */ + static class FailedServers { + private final LinkedList> failedServers = new + LinkedList>(); + private final int recheckServersTimeout; + + FailedServers(Configuration conf) { + this.recheckServersTimeout = conf.getInt( + FAILED_SERVER_EXPIRY_KEY, FAILED_SERVER_EXPIRY_DEFAULT); + } + + /** + * Add an address to the list of the failed servers list. + */ + public synchronized void addToFailedServers(InetSocketAddress address) { + final long expiry = EnvironmentEdgeManager.currentTimeMillis() + recheckServersTimeout; + failedServers.addFirst(new Pair(expiry, address.toString())); + } + + /** + * Check if the server should be considered as bad. Clean the old entries of the list. + * + * @return true if the server is in the failed servers list + */ + public synchronized boolean isFailedServer(final InetSocketAddress address) { + if (failedServers.isEmpty()) { + return false; + } + + final String lookup = address.toString(); + final long now = EnvironmentEdgeManager.currentTimeMillis(); + + // iterate, looking for the search entry and cleaning expired entries + Iterator> it = failedServers.iterator(); + while (it.hasNext()) { + Pair cur = it.next(); + if (cur.getFirst() < now) { + it.remove(); + } else { + if (lookup.equals(cur.getSecond())) { + return true; + } + } + } + + return false; + } + + } + + public static class FailedServerException extends IOException { + public FailedServerException(String s) { + super(s); + } + } + + /** * set the ping interval value in configuration * @@ -240,6 +306,15 @@ public class HBaseClient { tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(), new AuthenticationTokenSelector()); } + + /** + * Creates a connection. Can be overridden by a subclass for testing. + * @param remoteId - the ConnectionId to use for the connection creation. + */ + protected Connection createConnection(ConnectionId remoteId) throws IOException { + return new Connection(remoteId); + } + /** Thread that reads responses and notifies callers. Each connection owns a * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ @@ -263,7 +338,7 @@ public class HBaseClient { protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed protected IOException closeException; // close reason - public Connection(ConnectionId remoteId) throws IOException { + Connection(ConnectionId remoteId) throws IOException { if (remoteId.getAddress().isUnresolved()) { throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); @@ -359,17 +434,30 @@ public class HBaseClient { /** * Add a call to this connection's call queue and notify - * a listener; synchronized. - * Returns false if called during shutdown. + * a listener; synchronized. If the connection is dead, the call is not added, and the + * caller is notified. + * This function can return a connection that is already marked as 'shouldCloseConnection' + * It is up to the user code to check this status. * @param call to add - * @return true if the call was added. */ - protected synchronized boolean addCall(Call call) { - if (shouldCloseConnection.get()) - return false; - calls.put(call.id, call); - notify(); - return true; + protected synchronized void addCall(Call call) { + // If the connection is about to close, we manage this as if the call was already added + // to the connection calls list. If not, the connection creations are serialized, as + // mentioned in HBASE-6364 + if (this.shouldCloseConnection.get()) { + if (this.closeException == null) { + call.setException(new IOException( + "Call " + call.id + " not added as the connection " + remoteId + " is closing")); + } else { + call.setException(this.closeException); + } + synchronized (call) { + call.notifyAll(); + } + } else { + calls.put(call.id, call); + notify(); + } } /** This class sends a ping to the remote side when timeout on @@ -682,6 +770,18 @@ public class HBaseClient { return; } + if (failedServers.isFailedServer(remoteId.getAddress())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not trying to connect to " + server + + " this server is in the failed servers list"); + } + IOException e = new FailedServerException( + "This server is in the failed servers list: " + server); + markClosed(e); + close(); + throw e; + } + try { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+server); @@ -698,7 +798,7 @@ public class HBaseClient { final InputStream in2 = inStream; final OutputStream out2 = outStream; UserGroupInformation ticket = remoteId.getTicket().getUGI(); - if (authMethod == AuthMethod.KERBEROS) {; + if (authMethod == AuthMethod.KERBEROS) { if (ticket != null && ticket.getRealUser() != null) { ticket = ticket.getRealUser(); } @@ -744,6 +844,7 @@ public class HBaseClient { return; } } catch (IOException e) { + failedServers.addToFailedServers(remoteId.address); markClosed(e); close(); @@ -1037,7 +1138,6 @@ public class HBaseClient { /** * Construct an IPC client whose values are of the {@link Message} * class. - * @param valueClass value class * @param conf configuration * @param factory socket factory */ @@ -1057,6 +1157,7 @@ public class HBaseClient { this.clusterId = conf.get(HConstants.CLUSTER_ID, "default"); this.connections = new PoolMap( getPoolType(conf), getPoolSize(conf)); + this.failedServers = new FailedServers(conf); } /** @@ -1297,20 +1398,22 @@ public class HBaseClient { * refs for keys in HashMap properly. For now its ok. */ ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); - do { - synchronized (connections) { - connection = connections.get(remoteId); - if (connection == null) { - connection = new Connection(remoteId); - connections.put(remoteId, connection); - } + synchronized (connections) { + connection = connections.get(remoteId); + if (connection == null) { + connection = createConnection(remoteId); + connections.put(remoteId, connection); } - } while (!connection.addCall(call)); + } + connection.addCall(call); //we don't invoke the method below inside "synchronized (connections)" //block above. The reason for that is if the server happens to be slow, //it will take longer to establish a connection and that will slow the //entire system down. + //Moreover, if the connection is currently created, there will be many threads + // waiting here; as setupIOstreams is synchronized. If the connection fails with a + // timeout, they will all fail simultaneously. This is checked in setupIOstreams. connection.setupIOstreams(); return connection; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java index 311db22b5a8..f3824c0f6f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java @@ -50,7 +50,7 @@ public class EnvironmentEdgeManager { * Resets the managed instance to the default instance: {@link * DefaultEnvironmentEdge}. */ - static void reset() { + public static void reset() { injectEdge(new DefaultEnvironmentEdge()); } @@ -60,7 +60,7 @@ public class EnvironmentEdgeManager { * * @param edge the new edge. */ - static void injectEdge(EnvironmentEdge edge) { + public static void injectEdge(EnvironmentEdge edge) { if (edge == null) { reset(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java index 853a9cd5c86..643ba0b7192 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java @@ -35,6 +35,10 @@ public class ManualEnvironmentEdge implements EnvironmentEdge { value = newValue; } + public void incValue(long addedValue) { + value += addedValue; + } + @Override public long currentTimeMillis() { return this.value; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java new file mode 100644 index 00000000000..235f829e31f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.net.InetSocketAddress; + +@Category(MediumTests.class) // Can't be small, we're playing with the EnvironmentEdge +public class TestHBaseClient { + + @Test + public void testFailedServer(){ + ManualEnvironmentEdge ee = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge( ee ); + HBaseClient.FailedServers fs = new HBaseClient.FailedServers(new Configuration()); + + InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12); + InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); // same server as ia + InetSocketAddress ia3 = InetSocketAddress.createUnresolved("badtoo", 12); + InetSocketAddress ia4 = InetSocketAddress.createUnresolved("badtoo", 13); + + + Assert.assertFalse( fs.isFailedServer(ia) ); + + fs.addToFailedServers(ia); + Assert.assertTrue( fs.isFailedServer(ia) ); + Assert.assertTrue( fs.isFailedServer(ia2) ); + + ee.incValue( 1 ); + Assert.assertTrue( fs.isFailedServer(ia) ); + Assert.assertTrue( fs.isFailedServer(ia2) ); + + ee.incValue( HBaseClient.FAILED_SERVER_EXPIRY_DEFAULT + 1 ); + Assert.assertFalse( fs.isFailedServer(ia) ); + Assert.assertFalse( fs.isFailedServer(ia2) ); + + fs.addToFailedServers(ia); + fs.addToFailedServers(ia3); + fs.addToFailedServers(ia4); + + Assert.assertTrue( fs.isFailedServer(ia) ); + Assert.assertTrue( fs.isFailedServer(ia2) ); + Assert.assertTrue( fs.isFailedServer(ia3) ); + Assert.assertTrue( fs.isFailedServer(ia4) ); + + ee.incValue( HBaseClient.FAILED_SERVER_EXPIRY_DEFAULT + 1 ); + Assert.assertFalse( fs.isFailedServer(ia) ); + Assert.assertFalse( fs.isFailedServer(ia2) ); + Assert.assertFalse( fs.isFailedServer(ia3) ); + Assert.assertFalse( fs.isFailedServer(ia4) ); + + + fs.addToFailedServers(ia3); + Assert.assertFalse( fs.isFailedServer(ia4) ); + } +}