[HBASE-24956] ConnectionManager#locateRegionInMeta waits for user region lock indefinitely. (#2322)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-Off-By: Andrew Purtell <apurtell@apache.org> Signed-off by: Viraj Jasani <vjasani@apache.org> Signed-off-by: Michael Stack <stack@apache.org> Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
4316dc738c
commit
7fe07e90eb
@ -63,7 +63,7 @@ public class ConnectionConfiguration {
|
|||||||
// toggle for async/sync prefetch
|
// toggle for async/sync prefetch
|
||||||
private final boolean clientScannerAsyncPrefetch;
|
private final boolean clientScannerAsyncPrefetch;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param conf Configuration object
|
* @param conf Configuration object
|
||||||
*/
|
*/
|
||||||
@ -208,5 +208,4 @@ public class ConnectionConfiguration {
|
|||||||
public int getRpcTimeout() {
|
public int getRpcTimeout() {
|
||||||
return rpcTimeout;
|
return rpcTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -864,13 +864,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||||||
}
|
}
|
||||||
// Query the meta region
|
// Query the meta region
|
||||||
long pauseBase = this.pause;
|
long pauseBase = this.pause;
|
||||||
userRegionLock.lock();
|
takeUserRegionLock();
|
||||||
try {
|
try {
|
||||||
if (useCache) {// re-check cache after get lock
|
// We don't need to check if useCache is enabled or not. Even if useCache is false
|
||||||
RegionLocations locations = getCachedLocation(tableName, row);
|
// we already cleared the cache for this row before acquiring userRegion lock so if this
|
||||||
if (locations != null && locations.getRegionLocation(replicaId) != null) {
|
// row is present in cache that means some other thread has populated it while we were
|
||||||
return locations;
|
// waiting to acquire user region lock.
|
||||||
}
|
RegionLocations locations = getCachedLocation(tableName, row);
|
||||||
|
if (locations != null && locations.getRegionLocation(replicaId) != null) {
|
||||||
|
return locations;
|
||||||
}
|
}
|
||||||
if (relocateMeta) {
|
if (relocateMeta) {
|
||||||
relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
|
relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
|
||||||
@ -893,7 +895,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||||||
}
|
}
|
||||||
tableNotFound = false;
|
tableNotFound = false;
|
||||||
// convert the row result into the HRegionLocation we need!
|
// convert the row result into the HRegionLocation we need!
|
||||||
RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
|
locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
|
||||||
if (locations == null || locations.getRegionLocation(replicaId) == null) {
|
if (locations == null || locations.getRegionLocation(replicaId) == null) {
|
||||||
throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
|
throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
|
||||||
}
|
}
|
||||||
@ -969,6 +971,19 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void takeUserRegionLock() throws IOException {
|
||||||
|
try {
|
||||||
|
long waitTime = connectionConfig.getMetaOperationTimeout();
|
||||||
|
if (!userRegionLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
|
||||||
|
throw new LockTimeoutException("Failed to get user region lock in"
|
||||||
|
+ waitTime + " ms. " + " for accessing meta region server.");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.error("Interrupted while waiting for a lock", ie);
|
||||||
|
throw ExceptionUtil.asInterrupt(ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Put a newly discovered HRegionLocation into the cache.
|
* Put a newly discovered HRegionLocation into the cache.
|
||||||
* @param tableName The table name.
|
* @param tableName The table name.
|
||||||
|
@ -0,0 +1,32 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/*
|
||||||
|
Thrown whenever we are not able to get the lock within the specified wait time.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
public class LockTimeoutException extends HBaseIOException {
|
||||||
|
public LockTimeoutException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
@ -58,6 +58,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
|||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@Category({MediumTests.class, ClientTests.class})
|
@Category({MediumTests.class, ClientTests.class})
|
||||||
public class TestMetaCache {
|
public class TestMetaCache {
|
||||||
@ -70,8 +72,8 @@ public class TestMetaCache {
|
|||||||
private static final TableName TABLE_NAME = TableName.valueOf("test_table");
|
private static final TableName TABLE_NAME = TableName.valueOf("test_table");
|
||||||
private static final byte[] FAMILY = Bytes.toBytes("fam1");
|
private static final byte[] FAMILY = Bytes.toBytes("fam1");
|
||||||
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
|
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
|
||||||
|
|
||||||
private static HRegionServer badRS;
|
private static HRegionServer badRS;
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws java.lang.Exception
|
* @throws java.lang.Exception
|
||||||
@ -369,4 +371,77 @@ public class TestMetaCache {
|
|||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUserRegionLockThrowsException() throws IOException, InterruptedException {
|
||||||
|
((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector());
|
||||||
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
|
||||||
|
conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000);
|
||||||
|
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000);
|
||||||
|
|
||||||
|
try (ConnectionImplementation conn =
|
||||||
|
(ConnectionImplementation) ConnectionFactory.createConnection(conf)) {
|
||||||
|
ClientThread client1 = new ClientThread(conn);
|
||||||
|
ClientThread client2 = new ClientThread(conn);
|
||||||
|
client1.start();
|
||||||
|
client2.start();
|
||||||
|
client1.join();
|
||||||
|
client2.join();
|
||||||
|
// One thread will get the lock but will sleep in LockExceptionInjector#throwOnScan and
|
||||||
|
// eventually fail since the sleep time is more than hbase client scanner timeout period.
|
||||||
|
// Other thread will wait to acquire userRegionLock.
|
||||||
|
// Have no idea which thread will be scheduled first. So need to check both threads.
|
||||||
|
|
||||||
|
// Both the threads will throw exception. One thread will throw exception since after
|
||||||
|
// acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period
|
||||||
|
// is 2 seconds.
|
||||||
|
// Other thread will throw exception since it was not able to get hold of user region lock
|
||||||
|
// within meta operation timeout period.
|
||||||
|
assertNotNull(client1.getException());
|
||||||
|
assertNotNull(client2.getException());
|
||||||
|
|
||||||
|
assertTrue(client1.getException() instanceof LockTimeoutException
|
||||||
|
^ client2.getException() instanceof LockTimeoutException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class ClientThread extends Thread {
|
||||||
|
private Exception exception;
|
||||||
|
private ConnectionImplementation connection;
|
||||||
|
|
||||||
|
private ClientThread(ConnectionImplementation connection) {
|
||||||
|
this.connection = connection;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
byte[] currentKey = HConstants.EMPTY_START_ROW;
|
||||||
|
try {
|
||||||
|
connection.getRegionLocation(TABLE_NAME, currentKey, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Thread id: " + this.getId() + " exception: ", e);
|
||||||
|
this.exception = e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public Exception getException() {
|
||||||
|
return exception;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class LockSleepInjector extends ExceptionInjector {
|
||||||
|
@Override
|
||||||
|
public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.info("Interrupted exception", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) { }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) { }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user