HBASE-6364 Powering down the server host holding the .META. table causes HBase Client to take excessively long to recover and connect to reassigned .META. table

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1375473 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2012-08-21 11:12:10 +00:00
parent 933c80f7f3
commit c87c7237c7
6 changed files with 237 additions and 26 deletions

View File

@ -187,6 +187,9 @@ public final class HConstants {
/** Parameter name for what master implementation to use. */
public static final String MASTER_IMPL= "hbase.master.impl";
/** Parameter name for what hbase client implementation to use. */
public static final String HBASECLIENT_IMPL= "hbase.hbaseclient.impl";
/** Parameter name for how often threads should wake up */
public static final String THREAD_WAKE_FREQUENCY = "hbase.server.thread.wakefrequency";

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.hbase.ipc;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.io.Writable;
@ -32,7 +35,7 @@ import org.apache.hadoop.io.Writable;
* Enables reuse/sharing of clients on a per SocketFactory basis. A client
* establishes certain configuration dependent characteristics like timeouts,
* tcp-keepalive (true or false), etc. For more details on the characteristics,
* look at {@link HBaseClient#HBaseClient(Class, Configuration, SocketFactory)}
* look at {@link HBaseClient#HBaseClient(Configuration, SocketFactory)}
* Creation of dynamic proxies to protocols creates the clients (and increments
* reference count once created), and stopping of the proxies leads to clearing
* out references and when the reference drops to zero, the cache mapping is
@ -52,12 +55,29 @@ class ClientCache {
* @param factory socket factory
* @return an IPC client
*/
protected synchronized HBaseClient getClient(Configuration conf,
SocketFactory factory) {
@SuppressWarnings("unchecked")
protected synchronized HBaseClient getClient(Configuration conf, SocketFactory factory) {
HBaseClient client = clients.get(factory);
if (client == null) {
Class<? extends HBaseClient> hbaseClientClass = (Class<? extends HBaseClient>) conf
.getClass(HConstants.HBASECLIENT_IMPL, HBaseClient.class);
// Make an hbase client instead of hadoop Client.
client = new HBaseClient(conf, factory);
try {
Constructor<? extends HBaseClient> cst = hbaseClientClass.getConstructor(
Configuration.class, SocketFactory.class);
client = cst.newInstance(conf, factory);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new RuntimeException("No matching constructor in "+hbaseClientClass.getName(), e);
}
clients.put(factory, client);
} else {
client.incCount();

View File

@ -38,6 +38,7 @@ import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
@ -69,6 +70,8 @@ import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.apache.hadoop.io.IOUtils;
@ -113,6 +116,7 @@ public class HBaseClient {
protected final boolean tcpKeepAlive; // if T then use keepalives
protected int pingInterval; // how often sends ping to the server in msecs
protected int socketTimeout; // socket timeout
protected FailedServers failedServers;
protected final SocketFactory socketFactory; // how to create sockets
private int refCount = 1;
@ -124,6 +128,68 @@ public class HBaseClient {
final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
final static int PING_CALL_ID = -1;
public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
/**
* A class to manage a list of servers that failed recently.
*/
static class FailedServers {
private final LinkedList<Pair<Long, String>> failedServers = new
LinkedList<Pair<Long, java.lang.String>>();
private final int recheckServersTimeout;
FailedServers(Configuration conf) {
this.recheckServersTimeout = conf.getInt(
FAILED_SERVER_EXPIRY_KEY, FAILED_SERVER_EXPIRY_DEFAULT);
}
/**
* Add an address to the list of the failed servers list.
*/
public synchronized void addToFailedServers(InetSocketAddress address) {
final long expiry = EnvironmentEdgeManager.currentTimeMillis() + recheckServersTimeout;
failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
}
/**
* Check if the server should be considered as bad. Clean the old entries of the list.
*
* @return true if the server is in the failed servers list
*/
public synchronized boolean isFailedServer(final InetSocketAddress address) {
if (failedServers.isEmpty()) {
return false;
}
final String lookup = address.toString();
final long now = EnvironmentEdgeManager.currentTimeMillis();
// iterate, looking for the search entry and cleaning expired entries
Iterator<Pair<Long, String>> it = failedServers.iterator();
while (it.hasNext()) {
Pair<Long, String> cur = it.next();
if (cur.getFirst() < now) {
it.remove();
} else {
if (lookup.equals(cur.getSecond())) {
return true;
}
}
}
return false;
}
}
public static class FailedServerException extends IOException {
public FailedServerException(String s) {
super(s);
}
}
/**
* set the ping interval value in configuration
*
@ -240,6 +306,15 @@ public class HBaseClient {
tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(),
new AuthenticationTokenSelector());
}
/**
* Creates a connection. Can be overridden by a subclass for testing.
* @param remoteId - the ConnectionId to use for the connection creation.
*/
protected Connection createConnection(ConnectionId remoteId) throws IOException {
return new Connection(remoteId);
}
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
@ -263,7 +338,7 @@ public class HBaseClient {
protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
protected IOException closeException; // close reason
public Connection(ConnectionId remoteId) throws IOException {
Connection(ConnectionId remoteId) throws IOException {
if (remoteId.getAddress().isUnresolved()) {
throw new UnknownHostException("unknown host: " +
remoteId.getAddress().getHostName());
@ -359,17 +434,30 @@ public class HBaseClient {
/**
* Add a call to this connection's call queue and notify
* a listener; synchronized.
* Returns false if called during shutdown.
* a listener; synchronized. If the connection is dead, the call is not added, and the
* caller is notified.
* This function can return a connection that is already marked as 'shouldCloseConnection'
* It is up to the user code to check this status.
* @param call to add
* @return true if the call was added.
*/
protected synchronized boolean addCall(Call call) {
if (shouldCloseConnection.get())
return false;
protected synchronized void addCall(Call call) {
// If the connection is about to close, we manage this as if the call was already added
// to the connection calls list. If not, the connection creations are serialized, as
// mentioned in HBASE-6364
if (this.shouldCloseConnection.get()) {
if (this.closeException == null) {
call.setException(new IOException(
"Call " + call.id + " not added as the connection " + remoteId + " is closing"));
} else {
call.setException(this.closeException);
}
synchronized (call) {
call.notifyAll();
}
} else {
calls.put(call.id, call);
notify();
return true;
}
}
/** This class sends a ping to the remote side when timeout on
@ -682,6 +770,18 @@ public class HBaseClient {
return;
}
if (failedServers.isFailedServer(remoteId.getAddress())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not trying to connect to " + server +
" this server is in the failed servers list");
}
IOException e = new FailedServerException(
"This server is in the failed servers list: " + server);
markClosed(e);
close();
throw e;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
@ -698,7 +798,7 @@ public class HBaseClient {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
UserGroupInformation ticket = remoteId.getTicket().getUGI();
if (authMethod == AuthMethod.KERBEROS) {;
if (authMethod == AuthMethod.KERBEROS) {
if (ticket != null && ticket.getRealUser() != null) {
ticket = ticket.getRealUser();
}
@ -744,6 +844,7 @@ public class HBaseClient {
return;
}
} catch (IOException e) {
failedServers.addToFailedServers(remoteId.address);
markClosed(e);
close();
@ -1037,7 +1138,6 @@ public class HBaseClient {
/**
* Construct an IPC client whose values are of the {@link Message}
* class.
* @param valueClass value class
* @param conf configuration
* @param factory socket factory
*/
@ -1057,6 +1157,7 @@ public class HBaseClient {
this.clusterId = conf.get(HConstants.CLUSTER_ID, "default");
this.connections = new PoolMap<ConnectionId, Connection>(
getPoolType(conf), getPoolSize(conf));
this.failedServers = new FailedServers(conf);
}
/**
@ -1297,20 +1398,22 @@ public class HBaseClient {
* refs for keys in HashMap properly. For now its ok.
*/
ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId);
connection = createConnection(remoteId);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call));
connection.addCall(call);
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
//Moreover, if the connection is currently created, there will be many threads
// waiting here; as setupIOstreams is synchronized. If the connection fails with a
// timeout, they will all fail simultaneously. This is checked in setupIOstreams.
connection.setupIOstreams();
return connection;
}

View File

@ -50,7 +50,7 @@ public class EnvironmentEdgeManager {
* Resets the managed instance to the default instance: {@link
* DefaultEnvironmentEdge}.
*/
static void reset() {
public static void reset() {
injectEdge(new DefaultEnvironmentEdge());
}
@ -60,7 +60,7 @@ public class EnvironmentEdgeManager {
*
* @param edge the new edge.
*/
static void injectEdge(EnvironmentEdge edge) {
public static void injectEdge(EnvironmentEdge edge) {
if (edge == null) {
reset();
} else {

View File

@ -35,6 +35,10 @@ public class ManualEnvironmentEdge implements EnvironmentEdge {
value = newValue;
}
public void incValue(long addedValue) {
value += addedValue;
}
@Override
public long currentTimeMillis() {
return this.value;

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.net.InetSocketAddress;
@Category(MediumTests.class) // Can't be small, we're playing with the EnvironmentEdge
public class TestHBaseClient {
@Test
public void testFailedServer(){
ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge( ee );
HBaseClient.FailedServers fs = new HBaseClient.FailedServers(new Configuration());
InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12);
InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); // same server as ia
InetSocketAddress ia3 = InetSocketAddress.createUnresolved("badtoo", 12);
InetSocketAddress ia4 = InetSocketAddress.createUnresolved("badtoo", 13);
Assert.assertFalse( fs.isFailedServer(ia) );
fs.addToFailedServers(ia);
Assert.assertTrue( fs.isFailedServer(ia) );
Assert.assertTrue( fs.isFailedServer(ia2) );
ee.incValue( 1 );
Assert.assertTrue( fs.isFailedServer(ia) );
Assert.assertTrue( fs.isFailedServer(ia2) );
ee.incValue( HBaseClient.FAILED_SERVER_EXPIRY_DEFAULT + 1 );
Assert.assertFalse( fs.isFailedServer(ia) );
Assert.assertFalse( fs.isFailedServer(ia2) );
fs.addToFailedServers(ia);
fs.addToFailedServers(ia3);
fs.addToFailedServers(ia4);
Assert.assertTrue( fs.isFailedServer(ia) );
Assert.assertTrue( fs.isFailedServer(ia2) );
Assert.assertTrue( fs.isFailedServer(ia3) );
Assert.assertTrue( fs.isFailedServer(ia4) );
ee.incValue( HBaseClient.FAILED_SERVER_EXPIRY_DEFAULT + 1 );
Assert.assertFalse( fs.isFailedServer(ia) );
Assert.assertFalse( fs.isFailedServer(ia2) );
Assert.assertFalse( fs.isFailedServer(ia3) );
Assert.assertFalse( fs.isFailedServer(ia4) );
fs.addToFailedServers(ia3);
Assert.assertFalse( fs.isFailedServer(ia4) );
}
}