HBASE-10337 HTable.get() uninteruptible (Nicolas Liochon)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1564851 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2014-02-05 18:00:09 +00:00
parent 7898e68fd4
commit 8c23d35d50
6 changed files with 234 additions and 20 deletions

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -532,6 +533,7 @@ public class HConnectionManager {
try {
connection.close();
} catch (Exception e) {
ExceptionUtil.rethrowIfInterrupt(e);
if (connectSucceeded) {
throw new IOException("The connection to " + connection
+ " could not be deleted.", e);
@ -1123,7 +1125,11 @@ public class HConnectionManager {
MetaScanner.metaScan(conf, this, visitor, tableName, row,
this.prefetchRegionLimit, TableName.META_TABLE_NAME);
} catch (IOException e) {
LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);
if (ExceptionUtil.isInterrupt(e)) {
Thread.currentThread().interrupt();
} else {
LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);
}
}
}
@ -1252,6 +1258,8 @@ public class HConnectionManager {
// from the HTable constructor.
throw e;
} catch (IOException e) {
ExceptionUtil.rethrowIfInterrupt(e);
if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException();
}
@ -1528,6 +1536,7 @@ public class HConnectionManager {
try {
zkw = getKeepAliveZooKeeperWatcher();
} catch (IOException e) {
ExceptionUtil.rethrowIfInterrupt(e);
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
}
try {
@ -1588,7 +1597,7 @@ public class HConnectionManager {
if (exceptionCaught != null)
// It failed. If it's not the last try, we're going to wait a little
if (tries < numTries) {
if (tries < numTries && !ExceptionUtil.isInterrupt(exceptionCaught)) {
// tries at this point is 1 or more; decrement to start from 0.
long pauseTime = ConnectionUtils.getPauseTime(pause, tries - 1);
LOG.info("getMaster attempt " + tries + " of " + numTries +
@ -1598,7 +1607,7 @@ public class HConnectionManager {
try {
Thread.sleep(pauseTime);
} catch (InterruptedException e) {
throw new RuntimeException(
throw new MasterNotRunningException(
"Thread was interrupted while trying to connect to master.", e);
}
} else {

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.ipc.RemoteException;
import com.google.protobuf.ServiceException;
@ -130,6 +131,7 @@ public class RpcRetryingCaller<T> {
new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTimeMillis(), toString());
exceptions.add(qt);
ExceptionUtil.rethrowIfInterrupt(t);
if (tries >= retries - 1) {
throw new RetriesExhaustedException(tries, exceptions);
}
@ -184,6 +186,7 @@ public class RpcRetryingCaller<T> {
return callable.call();
} catch (Throwable t) {
Throwable t2 = translateException(t);
ExceptionUtil.rethrowIfInterrupt(t2);
// It would be nice to clear the location cache here.
if (t2 instanceof IOException) {
throw (IOException)t2;

View File

@ -26,6 +26,7 @@ import java.io.DataOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
@ -642,18 +644,19 @@ public class RpcClient {
*/
private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
throws IOException {
closeConnection();
// throw the exception if the maximum number of retries is reached
if (curRetries >= maxRetries) {
if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
throw ioe;
}
// otherwise back off and retry
try {
Thread.sleep(failureSleep);
} catch (InterruptedException ignored) {}
} catch (InterruptedException ie) {
ExceptionUtil.rethrowIfInterrupt(ie);
}
LOG.info("Retrying connect to server: " + remoteId.getAddress() +
" after sleeping " + failureSleep + "ms. Already tried " + curRetries +
@ -672,7 +675,9 @@ public class RpcClient {
if (timeout>0) {
try {
wait(timeout);
} catch (InterruptedException ignored) {}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
@ -1112,6 +1117,8 @@ public class RpcClient {
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
closeException = e;
} if (ExceptionUtil.isInterrupt(e)){
} else {
// Treat this as a fatal condition and close this connection
markClosed(e);
@ -1425,24 +1432,14 @@ public class RpcClient {
Connection connection =
getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
connection.writeRequest(call, priority); // send the parameter
boolean interrupted = false;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (call) {
while (!call.done) {
if (connection.shouldCloseConnection.get()) {
throw new IOException("Unexpected closed connection");
}
try {
call.wait(1000); // wait for the result
} catch (InterruptedException ignored) {
// save the fact that we were interrupted
interrupted = true;
}
}
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
call.wait(1000); // wait for the result
}
if (call.error != null) {

View File

@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.security.visibility.Authorizations;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DynamicClassLoader;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.Text;
@ -276,8 +277,11 @@ public final class ProtobufUtil {
if (e == null) {
return new IOException(se);
}
if (ExceptionUtil.isInterrupt(e)) {
return ExceptionUtil.asInterrupt(e);
}
if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException();
e = ((RemoteException) e).unwrapRemoteException();
}
return e instanceof IOException ? (IOException) e : new IOException(se);
}

View File

@ -0,0 +1,54 @@
package org.apache.hadoop.hbase.util;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedByInterruptException;
/**
* This class handles the different interruption classes.
* It can be:
* - InterruptedException
* - InterruptedIOException (inherits IOException); used in IO
* - ClosedByInterruptException (inherits IOException)
* , - SocketTimeoutException inherits InterruptedIOException but is not a real
* interruption, so we have to distinguish the case. This pattern is unfortunately common.
*/
public class ExceptionUtil {
/**
* @return true if the throwable comes an interruption, false otherwise.
*/
public static boolean isInterrupt(Throwable t) {
if (t instanceof InterruptedException) return true;
if (t instanceof SocketTimeoutException) return false;
return (t instanceof InterruptedIOException);
}
/**
* @throws InterruptedIOException if t was an interruption. Does nothing otherwise.
*/
public static void rethrowIfInterrupt(Throwable t) throws InterruptedIOException {
InterruptedIOException iie = asInterrupt(t);
if (iie != null) throw iie;
}
/**
* @return an InterruptedIOException if t was an interruption, null otherwise
*/
public static InterruptedIOException asInterrupt(Throwable t) {
if (t instanceof SocketTimeoutException) return null;
if (t instanceof InterruptedIOException) return (InterruptedIOException) t;
if (t instanceof InterruptedException) {
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(t);
return iie;
}
return null;
}
private ExceptionUtil() {
}
}

View File

@ -0,0 +1,147 @@
package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@Category(MediumTests.class)
public class TestClientOperationInterrupt {
private static final Log LOG = LogFactory.getLog(TestClientOperationInterrupt.class);
private static HBaseTestingUtility util;
private static final byte[] tableName = Bytes.toBytes("test");
private static final byte[] dummy = Bytes.toBytes("dummy");
private static final byte[] row1 = Bytes.toBytes("r1");
private static final byte[] test = Bytes.toBytes("test");
private static Configuration conf;
public static class TestCoprocessor extends BaseRegionObserver {
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
Threads.sleep(2500);
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
TestCoprocessor.class.getName());
util = new HBaseTestingUtility(conf);
util.startMiniCluster();
HBaseAdmin admin = util.getHBaseAdmin();
if (admin.tableExists(tableName)) {
if (admin.isTableEnabled(tableName)) {
admin.disableTable(tableName);
}
admin.deleteTable(tableName);
}
util.createTable(tableName, new byte[][]{dummy, test});
HTable ht = new HTable(conf, tableName);
Put p = new Put(row1);
p.add(dummy, dummy, dummy);
ht.put(p);
}
@Test
public void testInterrupt50Percent() throws IOException, InterruptedException {
final AtomicInteger noEx = new AtomicInteger(0);
final AtomicInteger badEx = new AtomicInteger(0);
final AtomicInteger noInt = new AtomicInteger(0);
final AtomicInteger done = new AtomicInteger(0);
List<Thread> threads = new ArrayList<Thread>();
final int nbThread = 100;
for (int i = 0; i < nbThread; i++) {
Thread t = new Thread() {
@Override
public void run() {
try {
HTable ht = new HTable(conf, tableName);
Result r = ht.get(new Get(row1));
noEx.incrementAndGet();
} catch (IOException e) {
LOG.info("exception", e);
if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) {
badEx.incrementAndGet();
} else {
if (Thread.currentThread().isInterrupted()) {
noInt.incrementAndGet();
LOG.info("The thread should NOT be with the 'interrupt' status.");
}
}
} finally {
done.incrementAndGet();
}
}
};
t.setName("TestClientOperationInterrupt #" + i);
threads.add(t);
t.start();
}
for (int i = 0; i < nbThread / 2; i++) {
threads.get(i).interrupt();
}
boolean stillAlive = true;
while (stillAlive) {
stillAlive = false;
for (Thread t : threads) {
if (t.isAlive()) {
stillAlive = true;
}
}
Threads.sleep(10);
}
Assert.assertFalse(Thread.currentThread().isInterrupted());
Assert.assertTrue(" noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get(),
noEx.get() == nbThread / 2 && badEx.get() == 0);
// The problem here is that we need the server to free its handlers to handle all operations
while (done.get() != nbThread){
Thread.sleep(1);
}
HTable ht = new HTable(conf, tableName);
Result r = ht.get(new Get(row1));
Assert.assertFalse(r.isEmpty());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
}