HBASE-20182 Can not locate region after split and merge
This commit is contained in:
parent
a422310dad
commit
adc0e85e85
|
@ -17,7 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
|
||||
import static org.apache.hadoop.hbase.HConstants.NINES;
|
||||
import static org.apache.hadoop.hbase.HConstants.ZEROES;
|
||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
|
@ -32,7 +31,6 @@ import java.util.Arrays;
|
|||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
@ -41,13 +39,14 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -269,26 +268,22 @@ class AsyncNonMetaRegionLocator {
|
|||
toSend.ifPresent(r -> locateInMeta(tableName, r));
|
||||
}
|
||||
|
||||
private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results,
|
||||
// return whether we should stop the scan
|
||||
private boolean onScanNext(TableName tableName, LocateRequest req, Result result,
|
||||
Throwable error) {
|
||||
if (error != null) {
|
||||
complete(tableName, req, null, error);
|
||||
return;
|
||||
}
|
||||
if (results.isEmpty()) {
|
||||
complete(tableName, req, null, new TableNotFoundException(tableName));
|
||||
return;
|
||||
}
|
||||
RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The fetched location of '" + tableName + "', row='" +
|
||||
Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType + " is " + locs);
|
||||
return true;
|
||||
}
|
||||
RegionLocations locs = MetaTableAccessor.getRegionLocations(result);
|
||||
LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
|
||||
Bytes.toStringBinary(req.row), req.locateType, locs);
|
||||
|
||||
if (locs == null || locs.getDefaultRegionLocation() == null) {
|
||||
complete(tableName, req, null,
|
||||
new IOException(String.format("No location found for '%s', row='%s', locateType=%s",
|
||||
tableName, Bytes.toStringBinary(req.row), req.locateType)));
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
HRegionLocation loc = locs.getDefaultRegionLocation();
|
||||
RegionInfo info = loc.getRegion();
|
||||
|
@ -296,33 +291,20 @@ class AsyncNonMetaRegionLocator {
|
|||
complete(tableName, req, null,
|
||||
new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
|
||||
tableName, Bytes.toStringBinary(req.row), req.locateType)));
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
if (!info.getTable().equals(tableName)) {
|
||||
complete(tableName, req, null, new TableNotFoundException(
|
||||
"Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"));
|
||||
return;
|
||||
}
|
||||
if (info.isSplit()) {
|
||||
complete(tableName, req, null,
|
||||
new RegionOfflineException(
|
||||
"the only available region for the required row is a split parent," +
|
||||
" the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
|
||||
return;
|
||||
}
|
||||
if (info.isOffline()) {
|
||||
complete(tableName, req, null, new RegionOfflineException("the region is offline, could" +
|
||||
" be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
|
||||
return;
|
||||
if (info.isSplitParent()) {
|
||||
return false;
|
||||
}
|
||||
if (loc.getServerName() == null) {
|
||||
complete(tableName, req, null,
|
||||
new NoServerForRegionException(
|
||||
String.format("No server address listed for region '%s', row='%s', locateType=%s",
|
||||
info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
complete(tableName, req, loc, null);
|
||||
return true;
|
||||
}
|
||||
|
||||
private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) {
|
||||
|
@ -368,21 +350,49 @@ class AsyncNonMetaRegionLocator {
|
|||
LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +
|
||||
"', locateType=" + req.locateType + " in meta");
|
||||
}
|
||||
byte[] metaKey;
|
||||
byte[] metaStartKey;
|
||||
if (req.locateType.equals(RegionLocateType.BEFORE)) {
|
||||
if (isEmptyStopRow(req.row)) {
|
||||
byte[] binaryTableName = tableName.getName();
|
||||
metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
|
||||
metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
|
||||
} else {
|
||||
metaKey = createRegionName(tableName, req.row, ZEROES, false);
|
||||
metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
|
||||
}
|
||||
} else {
|
||||
metaKey = createRegionName(tableName, req.row, NINES, false);
|
||||
metaStartKey = createRegionName(tableName, req.row, NINES, false);
|
||||
}
|
||||
byte[] metaStopKey =
|
||||
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
|
||||
conn.getTable(META_TABLE_NAME)
|
||||
.scanAll(new Scan().withStartRow(metaKey).setReversed(true).addFamily(CATALOG_FAMILY)
|
||||
.setOneRowLimit())
|
||||
.whenComplete((results, error) -> onScanComplete(tableName, req, results, error));
|
||||
.scan(new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
|
||||
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
|
||||
.setReadType(ReadType.PREAD), new AdvancedScanResultConsumer() {
|
||||
|
||||
private boolean completeNormally = false;
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
onScanNext(tableName, req, null, error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
if (!completeNormally) {
|
||||
onScanNext(tableName, req, null, new TableNotFoundException(tableName));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(Result[] results, ScanController controller) {
|
||||
for (Result result : results) {
|
||||
if (onScanNext(tableName, req, result, null)) {
|
||||
completeNormally = true;
|
||||
controller.terminate();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
|
||||
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
|
@ -771,41 +772,34 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return locations;
|
||||
}
|
||||
|
||||
/*
|
||||
* Search the hbase:meta table for the HRegionLocation
|
||||
* info that contains the table and row we're seeking.
|
||||
/**
|
||||
* Search the hbase:meta table for the HRegionLocation info that contains the table and row we're
|
||||
* seeking.
|
||||
*/
|
||||
private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
|
||||
boolean useCache, boolean retry, int replicaId) throws IOException {
|
||||
|
||||
// If we are supposed to be using the cache, look in the cache to see if
|
||||
// we already have the region.
|
||||
private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache,
|
||||
boolean retry, int replicaId) throws IOException {
|
||||
// If we are supposed to be using the cache, look in the cache to see if we already have the
|
||||
// region.
|
||||
if (useCache) {
|
||||
RegionLocations locations = getCachedLocation(tableName, row);
|
||||
if (locations != null && locations.getRegionLocation(replicaId) != null) {
|
||||
return locations;
|
||||
}
|
||||
}
|
||||
|
||||
// build the key of the meta region we should be looking for.
|
||||
// the extra 9's on the end are necessary to allow "exact" matches
|
||||
// without knowing the precise region names.
|
||||
byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
|
||||
byte[] metaStopKey =
|
||||
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
|
||||
|
||||
Scan s = new Scan();
|
||||
s.setReversed(true);
|
||||
s.withStartRow(metaStartKey);
|
||||
s.withStopRow(metaStopKey, true);
|
||||
s.addFamily(HConstants.CATALOG_FAMILY);
|
||||
|
||||
Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
|
||||
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
|
||||
.setReadType(ReadType.PREAD);
|
||||
if (this.useMetaReplicas) {
|
||||
s.setConsistency(Consistency.TIMELINE);
|
||||
}
|
||||
|
||||
int maxAttempts = (retry ? numTries : 1);
|
||||
for (int tries = 0; true; tries++) {
|
||||
for (int tries = 0; ; tries++) {
|
||||
if (tries >= maxAttempts) {
|
||||
throw new NoServerForRegionException("Unable to find region for "
|
||||
+ Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
|
||||
|
@ -821,7 +815,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
// We are only supposed to clean the cache for the specific replicaId
|
||||
metaCache.clearCache(tableName, row, replicaId);
|
||||
}
|
||||
|
||||
// Query the meta region
|
||||
long pauseBase = this.pause;
|
||||
userRegionLock.lock();
|
||||
|
@ -832,18 +825,22 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return locations;
|
||||
}
|
||||
}
|
||||
Result regionInfoRow = null;
|
||||
s.resetMvccReadPoint();
|
||||
s.setOneRowLimit();
|
||||
try (ReversedClientScanner rcs =
|
||||
new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
|
||||
rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
|
||||
regionInfoRow = rcs.next();
|
||||
}
|
||||
|
||||
boolean tableNotFound = true;
|
||||
for (;;) {
|
||||
Result regionInfoRow = rcs.next();
|
||||
if (regionInfoRow == null) {
|
||||
if (tableNotFound) {
|
||||
throw new TableNotFoundException(tableName);
|
||||
} else {
|
||||
throw new NoServerForRegionException(
|
||||
"Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
|
||||
}
|
||||
}
|
||||
tableNotFound = false;
|
||||
// convert the row result into the HRegionLocation we need!
|
||||
RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
|
||||
if (locations == null || locations.getRegionLocation(replicaId) == null) {
|
||||
|
@ -851,41 +848,41 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
}
|
||||
RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
|
||||
if (regionInfo == null) {
|
||||
throw new IOException("RegionInfo null or empty in " +
|
||||
TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
|
||||
throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME +
|
||||
", row=" + regionInfoRow);
|
||||
}
|
||||
|
||||
// possible we got a region of a different table...
|
||||
if (!regionInfo.getTable().equals(tableName)) {
|
||||
throw new TableNotFoundException(
|
||||
"Region of '" + regionInfo.getRegionNameAsString() + "' is expected in the table of '" + tableName + "', " +
|
||||
"but hbase:meta says it is in the table of '" + regionInfo.getTable() + "'. " +
|
||||
"hbase:meta might be damaged.");
|
||||
}
|
||||
if (regionInfo.isSplit()) {
|
||||
throw new RegionOfflineException ("Region for row is a split parent, daughters not online: " +
|
||||
regionInfo.getRegionNameAsString());
|
||||
// See HBASE-20182. It is possible that we locate to a split parent even after the
|
||||
// children are online, so here we need to skip this region and go to the next one.
|
||||
if (regionInfo.isSplitParent()) {
|
||||
continue;
|
||||
}
|
||||
if (regionInfo.isOffline()) {
|
||||
throw new RegionOfflineException("Region offline; disable table call? " +
|
||||
regionInfo.getRegionNameAsString());
|
||||
}
|
||||
|
||||
// It is possible that the split children have not been online yet and we have skipped
|
||||
// the parent in the above condition, so we may have already reached a region which does
|
||||
// not contains us.
|
||||
if (!regionInfo.containsRow(row)) {
|
||||
throw new NoServerForRegionException(
|
||||
"Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
|
||||
}
|
||||
ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
|
||||
if (serverName == null) {
|
||||
throw new NoServerForRegionException("No server address listed in "
|
||||
+ TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString()
|
||||
+ " containing row " + Bytes.toStringBinary(row));
|
||||
throw new NoServerForRegionException("No server address listed in " +
|
||||
TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
|
||||
" containing row " + Bytes.toStringBinary(row));
|
||||
}
|
||||
|
||||
if (isDeadServer(serverName)){
|
||||
throw new RegionServerStoppedException("hbase:meta says the region "+
|
||||
regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
|
||||
", but it is dead.");
|
||||
if (isDeadServer(serverName)) {
|
||||
throw new RegionServerStoppedException(
|
||||
"hbase:meta says the region " + regionInfo.getRegionNameAsString() +
|
||||
" is managed by the server " + serverName + ", but it is dead.");
|
||||
}
|
||||
// Instantiate the location
|
||||
cacheLocation(tableName, locations);
|
||||
return locations;
|
||||
}
|
||||
}
|
||||
} catch (TableNotFoundException e) {
|
||||
// if we got this error, probably means the table just plain doesn't
|
||||
// exist. rethrow the error immediately. this should always be coming
|
||||
|
@ -901,12 +898,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
pauseBase = this.pauseForCQTBE;
|
||||
}
|
||||
if (tries < maxAttempts - 1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("locateRegionInMeta parentTable=" + TableName.META_TABLE_NAME
|
||||
+ ", metaLocation=" + ", attempt=" + tries + " of " + maxAttempts
|
||||
+ " failed; retrying after sleep of "
|
||||
+ ConnectionUtils.getPauseTime(pauseBase, tries) + " because: " + e.getMessage());
|
||||
}
|
||||
LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " +
|
||||
"after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MiscTests.class, MediumTests.class })
|
||||
public class TestSplitMerge {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSplitMerge.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 1000);
|
||||
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
TableName tableName = TableName.valueOf("SplitMerge");
|
||||
byte[] family = Bytes.toBytes("CF");
|
||||
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
|
||||
UTIL.getAdmin().createTable(td, new byte[][] { Bytes.toBytes(1) });
|
||||
UTIL.waitTableAvailable(tableName);
|
||||
UTIL.getAdmin().split(tableName, Bytes.toBytes(2));
|
||||
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
|
||||
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return UTIL.getMiniHBaseCluster().getRegions(tableName).size() == 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String explainFailure() throws Exception {
|
||||
return "Split has not finished yet";
|
||||
}
|
||||
});
|
||||
RegionInfo regionA = null;
|
||||
RegionInfo regionB = null;
|
||||
for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
|
||||
if (region.getStartKey().length == 0) {
|
||||
regionA = region;
|
||||
} else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(1))) {
|
||||
regionB = region;
|
||||
}
|
||||
}
|
||||
assertNotNull(regionA);
|
||||
assertNotNull(regionB);
|
||||
UTIL.getAdmin().mergeRegionsAsync(regionA.getRegionName(), regionB.getRegionName(), false)
|
||||
.get(30, TimeUnit.SECONDS);
|
||||
assertEquals(2, UTIL.getAdmin().getRegions(tableName).size());
|
||||
|
||||
ServerName expected = UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName();
|
||||
assertEquals(expected, UTIL.getConnection().getRegionLocator(tableName)
|
||||
.getRegionLocation(Bytes.toBytes(1), true).getServerName());
|
||||
try (AsyncConnection asyncConn =
|
||||
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get()) {
|
||||
assertEquals(expected, asyncConn.getRegionLocator(tableName)
|
||||
.getRegionLocation(Bytes.toBytes(1), true).get().getServerName());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -87,9 +87,9 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan)
|
||||
throws IOException {
|
||||
if (e.getEnvironment().getRegionInfo().isMetaRegion()) {
|
||||
public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
|
||||
if (c.getEnvironment().getRegionInfo().isMetaRegion()) {
|
||||
int concurrency = CONCURRENCY.incrementAndGet();
|
||||
for (;;) {
|
||||
int max = MAX_CONCURRENCY.get();
|
||||
|
@ -102,14 +102,16 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
|
|||
}
|
||||
Threads.sleepWithoutInterrupt(10);
|
||||
}
|
||||
return hasNext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s)
|
||||
throws IOException {
|
||||
if (e.getEnvironment().getRegionInfo().isMetaRegion()) {
|
||||
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
|
||||
if (c.getEnvironment().getRegionInfo().isMetaRegion()) {
|
||||
CONCURRENCY.decrementAndGet();
|
||||
}
|
||||
return hasNext;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,7 +121,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
|
|||
conf.set(REGION_COPROCESSOR_CONF_KEY, CountingRegionObserver.class.getName());
|
||||
conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED);
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
|
||||
TEST_UTIL.getAdmin().balancerSwitch(false, true);
|
||||
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
|
||||
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
|
||||
registry.getClusterId().get(), User.getCurrent());
|
||||
|
@ -142,14 +144,14 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
|
|||
for (int i = 0; i < futures.size(); i++) {
|
||||
HRegionLocation loc = futures.get(i).get();
|
||||
if (i == 0) {
|
||||
assertTrue(isEmptyStartRow(loc.getRegionInfo().getStartKey()));
|
||||
assertTrue(isEmptyStartRow(loc.getRegion().getStartKey()));
|
||||
} else {
|
||||
assertEquals(String.format("%02x", i), Bytes.toString(loc.getRegionInfo().getStartKey()));
|
||||
assertEquals(String.format("%02x", i), Bytes.toString(loc.getRegion().getStartKey()));
|
||||
}
|
||||
if (i == futures.size() - 1) {
|
||||
assertTrue(isEmptyStopRow(loc.getRegionInfo().getEndKey()));
|
||||
assertTrue(isEmptyStopRow(loc.getRegion().getEndKey()));
|
||||
} else {
|
||||
assertEquals(String.format("%02x", i + 1), Bytes.toString(loc.getRegionInfo().getEndKey()));
|
||||
assertEquals(String.format("%02x", i + 1), Bytes.toString(loc.getRegion().getEndKey()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -161,6 +163,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
|
|||
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
|
||||
.collect(toList());
|
||||
assertLocs(futures);
|
||||
assertTrue(MAX_CONCURRENCY.get() <= MAX_ALLOWED);
|
||||
assertTrue("max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get(),
|
||||
MAX_CONCURRENCY.get() <= MAX_ALLOWED);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue