[HBASE-24956] ConnectionManager#locateRegionInMeta waits for user region lock indefinitely. (#2415)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
f0acafc58e
commit
e1fc3c4fd0
|
@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
|
|||
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
|
@ -1316,13 +1317,15 @@ class ConnectionManager {
|
|||
|
||||
// Query the meta region
|
||||
long pauseBase = this.pause;
|
||||
userRegionLock.lock();
|
||||
takeUserRegionLock();
|
||||
try {
|
||||
if (useCache) {// re-check cache after get lock
|
||||
RegionLocations locations = getCachedLocation(tableName, row);
|
||||
if (locations != null && locations.getRegionLocation(replicaId) != null) {
|
||||
return locations;
|
||||
}
|
||||
// We don't need to check if useCache is enabled or not. Even if useCache is false
|
||||
// we already cleared the cache for this row before acquiring userRegion lock so if this
|
||||
// row is present in cache that means some other thread has populated it while we were
|
||||
// waiting to acquire user region lock.
|
||||
RegionLocations locations = getCachedLocation(tableName, row);
|
||||
if (locations != null && locations.getRegionLocation(replicaId) != null) {
|
||||
return locations;
|
||||
}
|
||||
Result regionInfoRow = null;
|
||||
s.resetMvccReadPoint();
|
||||
|
@ -1339,7 +1342,7 @@ class ConnectionManager {
|
|||
}
|
||||
|
||||
// convert the row result into the HRegionLocation we need!
|
||||
RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
|
||||
locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
|
||||
if (locations == null || locations.getRegionLocation(replicaId) == null) {
|
||||
throw new IOException("HRegionInfo was null in " +
|
||||
tableName + ", row=" + regionInfoRow);
|
||||
|
@ -1423,6 +1426,19 @@ class ConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
void takeUserRegionLock() throws IOException {
|
||||
try {
|
||||
long waitTime = connectionConfig.getMetaOperationTimeout();
|
||||
if (!userRegionLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
|
||||
throw new LockTimeoutException("Failed to get user region lock in"
|
||||
+ waitTime + " ms. " + " for accessing meta region server.");
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.error("Interrupted while waiting for a lock", ie);
|
||||
throw ExceptionUtil.asInterrupt(ie);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Put a newly discovered HRegionLocation into the cache.
|
||||
* @param tableName The table name.
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.*;
|
||||
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
|
||||
|
@ -42,6 +43,8 @@ import org.junit.experimental.categories.Category;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static junit.framework.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
@ -57,6 +60,7 @@ public class TestMetaCache {
|
|||
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
|
||||
|
||||
private static HRegionServer badRS;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class);
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
|
@ -356,4 +360,77 @@ public class TestMetaCache {
|
|||
throws ServiceException {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserRegionLockThrowsException() throws IOException, InterruptedException {
|
||||
((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector());
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||
conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000);
|
||||
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000);
|
||||
|
||||
try (ConnectionManager.HConnectionImplementation conn =
|
||||
(ConnectionManager.HConnectionImplementation) ConnectionFactory.createConnection(conf)) {
|
||||
ClientThread client1 = new ClientThread(conn);
|
||||
ClientThread client2 = new ClientThread(conn);
|
||||
client1.start();
|
||||
client2.start();
|
||||
client1.join();
|
||||
client2.join();
|
||||
// One thread will get the lock but will sleep in LockExceptionInjector#throwOnScan and
|
||||
// eventually fail since the sleep time is more than hbase client scanner timeout period.
|
||||
// Other thread will wait to acquire userRegionLock.
|
||||
// Have no idea which thread will be scheduled first. So need to check both threads.
|
||||
|
||||
// Both the threads will throw exception. One thread will throw exception since after
|
||||
// acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period
|
||||
// is 2 seconds.
|
||||
// Other thread will throw exception since it was not able to get hold of user region lock
|
||||
// within meta operation timeout period.
|
||||
assertNotNull(client1.getException());
|
||||
assertNotNull(client2.getException());
|
||||
|
||||
assertTrue(client1.getException() instanceof LockTimeoutException
|
||||
^ client2.getException() instanceof LockTimeoutException);
|
||||
}
|
||||
}
|
||||
|
||||
private final class ClientThread extends Thread {
|
||||
private Exception exception;
|
||||
private ConnectionManager.HConnectionImplementation connection;
|
||||
|
||||
private ClientThread(ConnectionManager.HConnectionImplementation connection) {
|
||||
this.connection = connection;
|
||||
}
|
||||
@Override
|
||||
public void run() {
|
||||
byte[] currentKey = HConstants.EMPTY_START_ROW;
|
||||
try {
|
||||
connection.getRegionLocation(TABLE_NAME, currentKey, true);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Thread id: " + this.getId() + " exception: ", e);
|
||||
this.exception = e;
|
||||
}
|
||||
}
|
||||
public Exception getException() {
|
||||
return exception;
|
||||
}
|
||||
}
|
||||
|
||||
public static class LockSleepInjector extends ExceptionInjector {
|
||||
@Override
|
||||
public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) { }
|
||||
|
||||
@Override
|
||||
public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) { }
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue