HBASE-25126 Add load balance logic in hbase-client to distribute read load over meta replica regions
It adds load balance support for meta lookup in AsyncTableRegionLocator. The existing meta replica mode is renamed as "HedgedRead", client sends scan request to the primary meta replica region first, if response is not back within a configured amount of time, it will send scan requests to all meta replica regions and take the first response. On top of the existing mode, a new mode "LoadBalance" is introduced. In this mode, client first choose a meta replica region to send scan request. If the response is stale, it may send the request to another meta replica region or the primary region. In this mode, meta scan requests are load balanced across all replica regions with the primary mode as the ultimate source of truth. Two new config knobs are added: 1. hbase.locator.meta.replicas.mode Valid options are "None", "HedgedRead" and "LoadBalance", they are case insensitive. The default mode is "None". 2. hbase.locator.meta.replicas.mode.loadbalance.selector The load balance alogrithm to select a meta replica to send the requests. Only org.apache.hadoop.hbase.client.CatalogReplicaLoadBalanceReplicaSimpleSelector.class is supported for now, which is the default as well. The algorithm works as follows: a. Clients select a randome meta replica region to send the requests if there is no entry for the location in the stale location cache. b. If the location from one meta replica region is stale, a stale entry will be created in the statle location cache for the region. c. Clients select the primary meta region if the location is in the stale location cache. d. The stale location cache entries time out in 3 seconds. If there is no "hbase.locator.meta.replicas.mode" configured, it will check the config knob "hbase.meta.replicas.use". If "hbase.meta.replicas.use" is configured, the mode will be set to "HedgedRead". For branch-2 support, it introduces support for ReversedScan over a specific non-default replica region, this is mainly for load balance meta scan among all its replica regions. Remove ConnectionImplementation#setUseMetaReplicas()
This commit is contained in:
parent
47ad207664
commit
920cf9faf4
|
@ -113,7 +113,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
private final Optional<ServerStatisticTracker> stats;
|
||||
private final ClientBackoffPolicy backoffPolicy;
|
||||
|
||||
private ChoreService authService;
|
||||
private ChoreService choreService;
|
||||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
|
@ -125,6 +125,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
User user) {
|
||||
this.conf = conf;
|
||||
this.user = user;
|
||||
|
||||
if (user.isLoginFromKeytab()) {
|
||||
spawnRenewalChore(user.getUGI());
|
||||
}
|
||||
|
@ -176,8 +177,19 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
}
|
||||
|
||||
private void spawnRenewalChore(final UserGroupInformation user) {
|
||||
authService = new ChoreService("Relogin service");
|
||||
authService.scheduleChore(AuthUtil.getAuthRenewalChore(user));
|
||||
ChoreService service = getChoreService();
|
||||
service.scheduleChore(AuthUtil.getAuthRenewalChore(user));
|
||||
}
|
||||
|
||||
/**
|
||||
* If choreService has not been created yet, create the ChoreService.
|
||||
* @return ChoreService
|
||||
*/
|
||||
synchronized ChoreService getChoreService() {
|
||||
if (choreService == null) {
|
||||
choreService = new ChoreService("AsyncConn Chore Service");
|
||||
}
|
||||
return choreService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -199,8 +211,8 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
IOUtils.closeQuietly(clusterStatusListener);
|
||||
IOUtils.closeQuietly(rpcClient);
|
||||
IOUtils.closeQuietly(registry);
|
||||
if (authService != null) {
|
||||
authService.shutdown();
|
||||
if (choreService != null) {
|
||||
choreService.shutdown();
|
||||
}
|
||||
metrics.ifPresent(MetricsConnection::shutdown);
|
||||
closed = true;
|
||||
|
|
|
@ -30,6 +30,7 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegi
|
|||
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||
import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
|
||||
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
|
||||
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
|
||||
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
|
||||
|
||||
|
@ -46,6 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -89,7 +91,10 @@ class AsyncNonMetaRegionLocator {
|
|||
|
||||
private final int locatePrefetchLimit;
|
||||
|
||||
private final boolean useMetaReplicas;
|
||||
// The mode tells if HedgedRead, LoadBalance mode is supported.
|
||||
// The default mode is CatalogReplicaMode.None.
|
||||
private CatalogReplicaMode metaReplicaMode;
|
||||
private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
|
||||
|
||||
private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
|
||||
|
||||
|
@ -196,8 +201,42 @@ class AsyncNonMetaRegionLocator {
|
|||
MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
|
||||
this.locatePrefetchLimit =
|
||||
conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
|
||||
this.useMetaReplicas =
|
||||
conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
|
||||
|
||||
// Get the region locator's meta replica mode.
|
||||
this.metaReplicaMode = CatalogReplicaMode.fromString(conn.getConfiguration()
|
||||
.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
|
||||
|
||||
switch (this.metaReplicaMode) {
|
||||
case LOAD_BALANCE:
|
||||
String replicaSelectorClass = conn.getConfiguration().
|
||||
get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
|
||||
CatalogReplicaLoadBalanceSimpleSelector.class.getName());
|
||||
|
||||
this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
|
||||
replicaSelectorClass, META_TABLE_NAME, conn.getChoreService(), () -> {
|
||||
int numOfReplicas = 1;
|
||||
try {
|
||||
RegionLocations metaLocations = conn.registry.getMetaRegionLocations().get(
|
||||
conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
|
||||
numOfReplicas = metaLocations.size();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
|
||||
}
|
||||
return numOfReplicas;
|
||||
});
|
||||
break;
|
||||
case NONE:
|
||||
// If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
|
||||
|
||||
boolean useMetaReplicas = conn.getConfiguration().getBoolean(USE_META_REPLICAS,
|
||||
DEFAULT_USE_META_REPLICAS);
|
||||
if (useMetaReplicas) {
|
||||
this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// Doing nothing
|
||||
}
|
||||
}
|
||||
|
||||
private TableCache getTableCache(TableName tableName) {
|
||||
|
@ -433,9 +472,24 @@ class AsyncNonMetaRegionLocator {
|
|||
Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
|
||||
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
|
||||
.setReadType(ReadType.PREAD);
|
||||
if (useMetaReplicas) {
|
||||
scan.setConsistency(Consistency.TIMELINE);
|
||||
|
||||
switch (this.metaReplicaMode) {
|
||||
case LOAD_BALANCE:
|
||||
int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
|
||||
if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
|
||||
// If the selector gives a non-primary meta replica region, then go with it.
|
||||
// Otherwise, just go to primary in non-hedgedRead mode.
|
||||
scan.setConsistency(Consistency.TIMELINE);
|
||||
scan.setReplicaId(metaReplicaId);
|
||||
}
|
||||
break;
|
||||
case HEDGED_READ:
|
||||
scan.setConsistency(Consistency.TIMELINE);
|
||||
break;
|
||||
default:
|
||||
// do nothing
|
||||
}
|
||||
|
||||
conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
|
||||
|
||||
private boolean completeNormally = false;
|
||||
|
@ -577,6 +631,13 @@ class AsyncNonMetaRegionLocator {
|
|||
if (!canUpdateOnError(loc, oldLoc)) {
|
||||
return;
|
||||
}
|
||||
// Tell metaReplicaSelector that the location is stale. It will create a stale entry
|
||||
// with timestamp internally. Next time the client looks up the same location,
|
||||
// it will pick a different meta replica region.
|
||||
if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
|
||||
metaReplicaSelector.onError(loc);
|
||||
}
|
||||
|
||||
RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
|
||||
if (newLocs == null) {
|
||||
if (tableCache.cache.remove(startKey, oldLocs)) {
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A Catalog replica selector decides which catalog replica to go for read requests when it is
|
||||
* configured as CatalogReplicaMode.LoadBalance.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
interface CatalogReplicaLoadBalanceSelector {
|
||||
|
||||
/**
|
||||
* This method is called when input location is stale, i.e, when clients run into
|
||||
* org.apache.hadoop.hbase.NotServingRegionException.
|
||||
* @param loc stale location
|
||||
*/
|
||||
void onError(HRegionLocation loc);
|
||||
|
||||
/**
|
||||
* Select a catalog replica region where client go to loop up the input row key.
|
||||
*
|
||||
* @param tablename table name
|
||||
* @param row key to look up
|
||||
* @param locateType locate type
|
||||
* @return replica id
|
||||
*/
|
||||
int select(TableName tablename, byte[] row, RegionLocateType locateType);
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.util.function.IntSupplier;
|
||||
import org.apache.hadoop.hbase.ChoreService;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Factory to create a {@link CatalogReplicaLoadBalanceSelector}
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
final class CatalogReplicaLoadBalanceSelectorFactory {
|
||||
/**
|
||||
* Private Constructor
|
||||
*/
|
||||
private CatalogReplicaLoadBalanceSelectorFactory() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a CatalogReplicaLoadBalanceReplicaSelector based on input config.
|
||||
* @param replicaSelectorClass Selector classname.
|
||||
* @param tableName System table name.
|
||||
* @param choreService {@link ChoreService}
|
||||
* @return {@link CatalogReplicaLoadBalanceSelector}
|
||||
*/
|
||||
public static CatalogReplicaLoadBalanceSelector createSelector(String replicaSelectorClass,
|
||||
TableName tableName, ChoreService choreService, IntSupplier getReplicaCount) {
|
||||
return ReflectionUtils.instantiateWithCustomCtor(replicaSelectorClass,
|
||||
new Class[] { TableName.class, ChoreService.class, IntSupplier.class },
|
||||
new Object[] { tableName, choreService, getReplicaCount });
|
||||
}
|
||||
}
|
|
@ -0,0 +1,303 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
|
||||
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.function.IntSupplier;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||
import org.apache.hadoop.hbase.ChoreService;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* <p>CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load
|
||||
* balancing algorithm. It maintains a stale location cache for each table. Whenever client looks
|
||||
* up location, it first check if the row is the stale location cache. If yes, the location from
|
||||
* catalog replica is stale, it will go to the primary region to look up update-to-date location;
|
||||
* otherwise, it will randomly pick up a replica region for lookup. When clients receive
|
||||
* RegionNotServedException from region servers, it will add these region locations to the stale
|
||||
* location cache. The stale cache will be cleaned up periodically by a chore.</p>
|
||||
*
|
||||
* It follows a simple algorithm to choose a replica to go:
|
||||
*
|
||||
* <ol>
|
||||
* <li>If there is no stale location entry for rows it looks up, it will randomly
|
||||
* pick a replica region to do lookup. </li>
|
||||
* <li>If the location from the replica region is stale, client gets RegionNotServedException
|
||||
* from region server, in this case, it will create StaleLocationCacheEntry in
|
||||
* CatalogReplicaLoadBalanceReplicaSimpleSelector.</li>
|
||||
* <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it
|
||||
* tries to lookup, if entry exists, it will go with primary meta region to do lookup;
|
||||
* otherwise, it will follow step 1.</li>
|
||||
* <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li>
|
||||
* </ol>
|
||||
*/
|
||||
class CatalogReplicaLoadBalanceSimpleSelector implements
|
||||
CatalogReplicaLoadBalanceSelector, Stoppable {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class);
|
||||
private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
|
||||
private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds
|
||||
private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute
|
||||
|
||||
/**
|
||||
* StaleLocationCacheEntry is the entry when a stale location is reported by an client.
|
||||
*/
|
||||
private static final class StaleLocationCacheEntry {
|
||||
// timestamp in milliseconds
|
||||
private final long timestamp;
|
||||
|
||||
private final byte[] endKey;
|
||||
|
||||
StaleLocationCacheEntry(final byte[] endKey) {
|
||||
this.endKey = endKey;
|
||||
timestamp = EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
|
||||
public byte[] getEndKey() {
|
||||
return this.endKey;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return this.timestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
|
||||
.append("endKey", endKey)
|
||||
.append("timestamp", timestamp)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>>
|
||||
staleCache = new ConcurrentHashMap<>();
|
||||
private volatile int numOfReplicas;
|
||||
private final ChoreService choreService;
|
||||
private final TableName tableName;
|
||||
private final IntSupplier getNumOfReplicas;
|
||||
private volatile boolean isStopped = false;
|
||||
private final static int UNINITIALIZED_NUM_OF_REPLICAS = -1;
|
||||
|
||||
CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, ChoreService choreService,
|
||||
IntSupplier getNumOfReplicas) {
|
||||
this.choreService = choreService;
|
||||
this.tableName = tableName;
|
||||
this.getNumOfReplicas = getNumOfReplicas;
|
||||
|
||||
// This numOfReplicas is going to be lazy initialized.
|
||||
this.numOfReplicas = UNINITIALIZED_NUM_OF_REPLICAS;
|
||||
// Start chores
|
||||
this.choreService.scheduleChore(getCacheCleanupChore(this));
|
||||
this.choreService.scheduleChore(getRefreshReplicaCountChore(this));
|
||||
}
|
||||
|
||||
/**
|
||||
* When a client runs into RegionNotServingException, it will call this method to
|
||||
* update Selector's internal state.
|
||||
* @param loc the location which causes exception.
|
||||
*/
|
||||
public void onError(HRegionLocation loc) {
|
||||
ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache =
|
||||
computeIfAbsent(staleCache, loc.getRegion().getTable(),
|
||||
() -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR));
|
||||
byte[] startKey = loc.getRegion().getStartKey();
|
||||
tableCache.putIfAbsent(startKey,
|
||||
new StaleLocationCacheEntry(loc.getRegion().getEndKey()));
|
||||
LOG.debug("Add entry to stale cache for table {} with startKey {}, {}",
|
||||
loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey());
|
||||
}
|
||||
|
||||
/**
|
||||
* Select an random replica id. In case there is no replica region configured, return
|
||||
* the primary replica id.
|
||||
* @return Replica id
|
||||
*/
|
||||
private int getRandomReplicaId() {
|
||||
int cachedNumOfReplicas = this.numOfReplicas;
|
||||
if (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) {
|
||||
cachedNumOfReplicas = refreshCatalogReplicaCount();
|
||||
this.numOfReplicas = cachedNumOfReplicas;
|
||||
}
|
||||
// In case of no replica configured, return the primary region id.
|
||||
if (cachedNumOfReplicas <= 1) {
|
||||
return RegionInfo.DEFAULT_REPLICA_ID;
|
||||
}
|
||||
return 1 + ThreadLocalRandom.current().nextInt(cachedNumOfReplicas - 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* When it looks up a location, it will call this method to find a replica region to go.
|
||||
* For a normal case, > 99% of region locations from catalog/meta replica will be up to date.
|
||||
* In extreme cases such as region server crashes, it will depends on how fast replication
|
||||
* catches up.
|
||||
*
|
||||
* @param tablename table name it looks up
|
||||
* @param row key it looks up.
|
||||
* @param locateType locateType, Only BEFORE and CURRENT will be passed in.
|
||||
* @return catalog replica id
|
||||
*/
|
||||
public int select(final TableName tablename, final byte[] row,
|
||||
final RegionLocateType locateType) {
|
||||
Preconditions.checkArgument(locateType == RegionLocateType.BEFORE ||
|
||||
locateType == RegionLocateType.CURRENT,
|
||||
"Expected type BEFORE or CURRENT but got: %s", locateType);
|
||||
|
||||
ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tablename);
|
||||
|
||||
// If there is no entry in StaleCache, select a random replica id.
|
||||
if (tableCache == null) {
|
||||
return getRandomReplicaId();
|
||||
}
|
||||
|
||||
Map.Entry<byte[], StaleLocationCacheEntry> entry;
|
||||
boolean isEmptyStopRow = isEmptyStopRow(row);
|
||||
// Only BEFORE and CURRENT are passed in.
|
||||
if (locateType == RegionLocateType.BEFORE) {
|
||||
entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row);
|
||||
} else {
|
||||
entry = tableCache.floorEntry(row);
|
||||
}
|
||||
|
||||
// It is not in the stale cache, return a random replica id.
|
||||
if (entry == null) {
|
||||
return getRandomReplicaId();
|
||||
}
|
||||
|
||||
// The entry here is a possible match for the location. Check if the entry times out first as
|
||||
// long comparing is faster than comparing byte arrays(in most cases). It could remove
|
||||
// stale entries faster. If the possible match entry does not time out, it will check if
|
||||
// the entry is a match for the row passed in and select the replica id accordingly.
|
||||
if ((EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp()) >=
|
||||
STALE_CACHE_TIMEOUT_IN_MILLISECONDS) {
|
||||
LOG.debug("Entry for table {} with startKey {}, {} times out", tablename, entry.getKey(),
|
||||
entry);
|
||||
tableCache.remove(entry.getKey());
|
||||
return getRandomReplicaId();
|
||||
}
|
||||
|
||||
byte[] endKey = entry.getValue().getEndKey();
|
||||
|
||||
// The following logic is borrowed from AsyncNonMetaRegionLocator.
|
||||
if (isEmptyStopRow(endKey)) {
|
||||
LOG.debug("Lookup {} goes to primary region", row);
|
||||
return RegionInfo.DEFAULT_REPLICA_ID;
|
||||
}
|
||||
|
||||
if (locateType == RegionLocateType.BEFORE) {
|
||||
if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) {
|
||||
LOG.debug("Lookup {} goes to primary meta", row);
|
||||
return RegionInfo.DEFAULT_REPLICA_ID;
|
||||
}
|
||||
} else {
|
||||
if (Bytes.compareTo(row, endKey) < 0) {
|
||||
LOG.debug("Lookup {} goes to primary meta", row);
|
||||
return RegionInfo.DEFAULT_REPLICA_ID;
|
||||
}
|
||||
}
|
||||
|
||||
// Not in stale cache, return a random replica id.
|
||||
return getRandomReplicaId();
|
||||
}
|
||||
|
||||
// This class implements the Stoppable interface as chores needs a Stopable object, there is
|
||||
// no-op on this Stoppable object currently.
|
||||
@Override
|
||||
public void stop(String why) {
|
||||
isStopped = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStopped() {
|
||||
return isStopped;
|
||||
}
|
||||
|
||||
private void cleanupReplicaReplicaStaleCache() {
|
||||
long curTimeInMills = EnvironmentEdgeManager.currentTime();
|
||||
for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) {
|
||||
Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it =
|
||||
tableCache.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next();
|
||||
if (curTimeInMills - entry.getValue().getTimestamp() >=
|
||||
STALE_CACHE_TIMEOUT_IN_MILLISECONDS) {
|
||||
LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue());
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int refreshCatalogReplicaCount() {
|
||||
int newNumOfReplicas = this.getNumOfReplicas.getAsInt();
|
||||
LOG.debug("Refreshed replica count {}", newNumOfReplicas);
|
||||
if (newNumOfReplicas == 1) {
|
||||
LOG.warn("Table {}'s region replica count is 1, maybe a misconfiguration or failure to "
|
||||
+ "fetch the replica count", tableName);
|
||||
}
|
||||
int cachedNumOfReplicas = this.numOfReplicas;
|
||||
|
||||
// If the returned number of replicas is 1, it is mostly caused by failure to fetch the
|
||||
// replica count. Do not update the numOfReplicas in this case.
|
||||
if ((cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) ||
|
||||
((cachedNumOfReplicas != newNumOfReplicas) && (newNumOfReplicas != 1))) {
|
||||
this.numOfReplicas = newNumOfReplicas;
|
||||
}
|
||||
return newNumOfReplicas;
|
||||
}
|
||||
|
||||
private ScheduledChore getCacheCleanupChore(
|
||||
final CatalogReplicaLoadBalanceSimpleSelector selector) {
|
||||
return new ScheduledChore("CleanupCatalogReplicaStaleCache", this,
|
||||
STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) {
|
||||
@Override
|
||||
protected void chore() {
|
||||
selector.cleanupReplicaReplicaStaleCache();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private ScheduledChore getRefreshReplicaCountChore(
|
||||
final CatalogReplicaLoadBalanceSimpleSelector selector) {
|
||||
return new ScheduledChore("RefreshReplicaCountChore", this,
|
||||
REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) {
|
||||
@Override
|
||||
protected void chore() {
|
||||
selector.refreshCatalogReplicaCount();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* <p>There are two modes with catalog replica support. </p>
|
||||
*
|
||||
* <ol>
|
||||
* <li>HEDGED_READ - Client sends requests to the primary region first, within a
|
||||
* configured amount of time, if there is no response coming back,
|
||||
* client sends requests to all replica regions and takes the first
|
||||
* response. </li>
|
||||
*
|
||||
* <li>LOAD_BALANCE - Client sends requests to replica regions in a round-robin mode,
|
||||
* if results from replica regions are stale, next time, client sends requests for
|
||||
* these stale locations to the primary region. In this mode, scan
|
||||
* requests are load balanced across all replica regions.</li>
|
||||
* </ol>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
enum CatalogReplicaMode {
|
||||
NONE {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "None";
|
||||
}
|
||||
},
|
||||
HEDGED_READ {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HedgedRead";
|
||||
}
|
||||
},
|
||||
LOAD_BALANCE {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "LoadBalance";
|
||||
}
|
||||
};
|
||||
|
||||
public static CatalogReplicaMode fromString(final String value) {
|
||||
for(CatalogReplicaMode mode : values()) {
|
||||
if (mode.toString().equalsIgnoreCase(value)) {
|
||||
return mode;
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
}
|
|
@ -18,10 +18,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
|
||||
import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
|
||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
||||
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
|
||||
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
|
||||
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsentEx;
|
||||
|
||||
|
@ -161,7 +165,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
private final boolean hostnamesCanChange;
|
||||
private final long pause;
|
||||
private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
|
||||
private boolean useMetaReplicas;
|
||||
// The mode tells if HedgedRead, LoadBalance mode is supported.
|
||||
// The default mode is CatalogReplicaMode.None.
|
||||
private CatalogReplicaMode metaReplicaMode;
|
||||
private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
|
||||
|
||||
private final int metaReplicaCallTimeoutScanInMicroSecond;
|
||||
private final int numTries;
|
||||
final int rpcTimeout;
|
||||
|
@ -232,7 +240,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
/** lock guards against multiple threads trying to query the meta region at the same time */
|
||||
private final ReentrantLock userRegionLock = new ReentrantLock();
|
||||
|
||||
private ChoreService authService;
|
||||
private ChoreService choreService;
|
||||
|
||||
/**
|
||||
* constructor
|
||||
|
@ -258,8 +266,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
} else {
|
||||
this.pauseForCQTBE = configuredPauseForCQTBE;
|
||||
}
|
||||
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
|
||||
HConstants.DEFAULT_USE_META_REPLICAS);
|
||||
this.metaReplicaCallTimeoutScanInMicroSecond =
|
||||
connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
|
||||
|
||||
|
@ -332,19 +338,47 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
close();
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Get the region locator's meta replica mode.
|
||||
this.metaReplicaMode = CatalogReplicaMode.fromString(conf.get(LOCATOR_META_REPLICAS_MODE,
|
||||
CatalogReplicaMode.NONE.toString()));
|
||||
|
||||
switch (this.metaReplicaMode) {
|
||||
case LOAD_BALANCE:
|
||||
String replicaSelectorClass = conf.get(
|
||||
RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
|
||||
CatalogReplicaLoadBalanceSimpleSelector.class.getName());
|
||||
|
||||
this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
|
||||
replicaSelectorClass, META_TABLE_NAME, getChoreService(), () -> {
|
||||
int numOfReplicas = 1;
|
||||
try {
|
||||
RegionLocations metaLocations = registry.getMetaRegionLocations().get(
|
||||
connectionConfig.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
|
||||
numOfReplicas = metaLocations.size();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
|
||||
}
|
||||
return numOfReplicas;
|
||||
});
|
||||
break;
|
||||
case NONE:
|
||||
// If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
|
||||
|
||||
boolean useMetaReplicas = conf.getBoolean(USE_META_REPLICAS,
|
||||
DEFAULT_USE_META_REPLICAS);
|
||||
if (useMetaReplicas) {
|
||||
this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// Doing nothing
|
||||
}
|
||||
}
|
||||
|
||||
private void spawnRenewalChore(final UserGroupInformation user) {
|
||||
authService = new ChoreService("Relogin service");
|
||||
authService.scheduleChore(AuthUtil.getAuthRenewalChore(user));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param useMetaReplicas
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void setUseMetaReplicas(final boolean useMetaReplicas) {
|
||||
this.useMetaReplicas = useMetaReplicas;
|
||||
ChoreService service = getChoreService();
|
||||
service.scheduleChore(AuthUtil.getAuthRenewalChore(user));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -580,6 +614,17 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If choreService has not been created yet, create the ChoreService.
|
||||
* @return ChoreService
|
||||
*/
|
||||
synchronized ChoreService getChoreService() {
|
||||
if (choreService == null) {
|
||||
choreService = new ChoreService("AsyncConn Chore Service");
|
||||
}
|
||||
return choreService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return this.conf;
|
||||
|
@ -841,8 +886,23 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
|
||||
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
|
||||
.setReadType(ReadType.PREAD);
|
||||
if (this.useMetaReplicas) {
|
||||
s.setConsistency(Consistency.TIMELINE);
|
||||
|
||||
switch (this.metaReplicaMode) {
|
||||
case LOAD_BALANCE:
|
||||
int metaReplicaId = this.metaReplicaSelector.select(tableName, row,
|
||||
RegionLocateType.CURRENT);
|
||||
if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
|
||||
// If the selector gives a non-primary meta replica region, then go with it.
|
||||
// Otherwise, just go to primary in non-hedgedRead mode.
|
||||
s.setConsistency(Consistency.TIMELINE);
|
||||
s.setReplicaId(metaReplicaId);
|
||||
}
|
||||
break;
|
||||
case HEDGED_READ:
|
||||
s.setConsistency(Consistency.TIMELINE);
|
||||
break;
|
||||
default:
|
||||
// do nothing
|
||||
}
|
||||
int maxAttempts = (retry ? numTries : 1);
|
||||
boolean relocateMeta = false;
|
||||
|
@ -1980,6 +2040,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
metrics.incrCacheDroppingExceptions(exception);
|
||||
}
|
||||
|
||||
// Tell metaReplicaSelector that the location is stale. It will create a stale entry
|
||||
// with timestamp internally. Next time the client looks up the same location,
|
||||
// it will pick a different meta replica region.
|
||||
if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
|
||||
metaReplicaSelector.onError(oldLocation);
|
||||
}
|
||||
|
||||
// If we're here, it means that can cannot be sure about the location, so we remove it from
|
||||
// the cache. Do not send the source because source can be a new server in the same host:port
|
||||
metaCache.clearCache(regionInfo);
|
||||
|
@ -2050,8 +2117,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
if (rpcClient != null) {
|
||||
rpcClient.close();
|
||||
}
|
||||
if (authService != null) {
|
||||
authService.shutdown();
|
||||
if (choreService != null) {
|
||||
choreService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,18 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
*/
|
||||
@InterfaceAudience.Public
|
||||
public interface RegionLocator extends Closeable {
|
||||
|
||||
/** Configuration for Region Locator's mode when meta replica is configured.
|
||||
* Valid values are: HedgedRead, LoadBalance, None
|
||||
*/
|
||||
String LOCATOR_META_REPLICAS_MODE = "hbase.locator.meta.replicas.mode";
|
||||
|
||||
/** Configuration for meta replica selector when Region Locator's LoadBalance mode is configured.
|
||||
* The default value is org.apache.hadoop.hbase.client.CatalogReplicaLoadBalanceSimpleSelector.
|
||||
*/
|
||||
String LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR =
|
||||
"hbase.locator.meta.replicas.mode.loadbalance.selector";
|
||||
|
||||
/**
|
||||
* Finds the region on which the given row is being served. Does not reload the cache.
|
||||
* @param row Row to find.
|
||||
|
|
|
@ -237,7 +237,6 @@ public class ResultBoundedCompletionService<V> {
|
|||
return f;
|
||||
}
|
||||
|
||||
// impossible to reach
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -45,10 +45,11 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
|
||||
/**
|
||||
* This class has the logic for handling scanners for regions with and without replicas.
|
||||
* 1. A scan is attempted on the default (primary) region
|
||||
* 2. The scanner sends all the RPCs to the default region until it is done, or, there
|
||||
* is a timeout on the default (a timeout of zero is disallowed).
|
||||
* 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
|
||||
* 1. A scan is attempted on the default (primary) region, or a specific region.
|
||||
* 2. The scanner sends all the RPCs to the default/specific region until it is done, or, there
|
||||
* is a timeout on the default/specific region (a timeout of zero is disallowed).
|
||||
* 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s) only
|
||||
* for Consistency.TIMELINE without specific replica id specified.
|
||||
* 4. The results from the first successful scanner are taken, and it is stored which server
|
||||
* returned the results.
|
||||
* 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
|
||||
|
@ -160,7 +161,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
RegionLocations rl = null;
|
||||
try {
|
||||
rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
|
||||
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
|
||||
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
|
||||
currentScannerCallable.getRow());
|
||||
} catch (RetriesExhaustedException | DoNotRetryIOException e) {
|
||||
// We cannot get the primary replica region location, it is possible that the region server
|
||||
|
|
|
@ -142,6 +142,21 @@ public final class RegionReplicaTestHelper {
|
|||
locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
|
||||
.getDefaultRegionLocation().getServerName());
|
||||
// should get the new location when reload = true
|
||||
// when meta replica LoadBalance mode is enabled, it may delay a bit.
|
||||
util.waitFor(3000, new ExplainingPredicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
ServerName sn = locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID,
|
||||
true).getDefaultRegionLocation().getServerName();
|
||||
return newServerName.equals(sn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String explainFailure() throws Exception {
|
||||
return "New location does not show up in meta (replica) region";
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(newServerName,
|
||||
locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true)
|
||||
.getDefaultRegionLocation().getServerName());
|
||||
|
|
|
@ -29,12 +29,15 @@ import static org.junit.Assert.assertThat;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
|
@ -49,41 +52,61 @@ import org.apache.hadoop.hbase.security.User;
|
|||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestAsyncNonMetaRegionLocator {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestAsyncNonMetaRegionLocator.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("async");
|
||||
|
||||
private static byte[] FAMILY = Bytes.toBytes("cf");
|
||||
private static final int META_STOREFILE_REFRESH_PERIOD = 100;
|
||||
private static final int NB_SERVERS = 4;
|
||||
private static int numOfMetaReplica = NB_SERVERS - 1;
|
||||
|
||||
private static AsyncConnectionImpl CONN;
|
||||
|
||||
private static AsyncNonMetaRegionLocator LOCATOR;
|
||||
private static ConnectionRegistry registry;
|
||||
|
||||
private static byte[][] SPLIT_KEYS;
|
||||
private CatalogReplicaMode metaReplicaMode;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
TEST_UTIL.getAdmin().balancerSwitch(false, true);
|
||||
ConnectionRegistry registry =
|
||||
ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
|
||||
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
|
||||
registry.getClusterId().get(), User.getCurrent());
|
||||
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
|
||||
// Enable hbase:meta replication.
|
||||
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
|
||||
conf.setLong("replication.source.sleepforretries", 10); // 10 ms
|
||||
TEST_UTIL.startMiniCluster(NB_SERVERS);
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
admin.balancerSwitch(false, true);
|
||||
// Enable hbase:meta replication.
|
||||
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, numOfMetaReplica);
|
||||
TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster().getRegions(
|
||||
TableName.META_TABLE_NAME).size() >= numOfMetaReplica);
|
||||
|
||||
registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
|
||||
SPLIT_KEYS = new byte[8][];
|
||||
for (int i = 111; i < 999; i += 111) {
|
||||
SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
|
||||
|
@ -108,6 +131,26 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
LOCATOR.clearCache(TABLE_NAME);
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> parameters() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{ null },
|
||||
{ CatalogReplicaMode.LOAD_BALANCE.toString() }
|
||||
});
|
||||
}
|
||||
|
||||
public TestAsyncNonMetaRegionLocator(String clientMetaReplicaMode) throws Exception {
|
||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||
// Enable meta replica LoadBalance mode for this connection.
|
||||
if (clientMetaReplicaMode != null) {
|
||||
c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, clientMetaReplicaMode);
|
||||
metaReplicaMode = CatalogReplicaMode.fromString(clientMetaReplicaMode);
|
||||
}
|
||||
|
||||
CONN = new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), User.getCurrent());
|
||||
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
|
||||
}
|
||||
|
||||
private void createSingleRegionTable() throws IOException, InterruptedException {
|
||||
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
|
@ -347,8 +390,21 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
|
||||
}
|
||||
// should get the new location when reload = true
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
|
||||
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
|
||||
// when meta replica LoadBalance mode is enabled, it may delay a bit.
|
||||
TEST_UTIL.waitFor(3000, new ExplainingPredicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
HRegionLocation loc = getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW,
|
||||
RegionLocateType.CURRENT, true).get();
|
||||
return newServerName.equals(loc.getServerName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String explainFailure() throws Exception {
|
||||
return "New location does not show up in meta (replica) region";
|
||||
}
|
||||
});
|
||||
|
||||
// the cached location should be replaced
|
||||
for (RegionLocateType locateType : RegionLocateType.values()) {
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -59,8 +60,10 @@ import org.junit.AfterClass;
|
|||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -84,6 +87,9 @@ public class TestReplicaWithCluster {
|
|||
private final static int REFRESH_PERIOD = 1000;
|
||||
private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
/**
|
||||
* This copro is used to synchronize the tests.
|
||||
*/
|
||||
|
@ -280,7 +286,7 @@ public class TestReplicaWithCluster {
|
|||
@Test
|
||||
public void testCreateDeleteTable() throws IOException {
|
||||
// Create table then get the single region for our new table.
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
|
||||
hdt.setRegionReplication(NB_SERVERS);
|
||||
hdt.addCoprocessor(SlowMeCopro.class.getName());
|
||||
Table table = HTU.createTable(hdt, new byte[][]{f}, null);
|
||||
|
@ -312,7 +318,7 @@ public class TestReplicaWithCluster {
|
|||
|
||||
@Test
|
||||
public void testChangeTable() throws Exception {
|
||||
TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable"))
|
||||
TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.setRegionReplication(NB_SERVERS)
|
||||
.setCoprocessor(SlowMeCopro.class.getName())
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(f))
|
||||
|
@ -372,7 +378,7 @@ public class TestReplicaWithCluster {
|
|||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testReplicaAndReplication() throws Exception {
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication");
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
|
||||
hdt.setRegionReplication(NB_SERVERS);
|
||||
|
||||
HColumnDescriptor fam = new HColumnDescriptor(row);
|
||||
|
@ -458,14 +464,14 @@ public class TestReplicaWithCluster {
|
|||
public void testBulkLoad() throws IOException {
|
||||
// Create table then get the single region for our new table.
|
||||
LOG.debug("Creating test table");
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad");
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
|
||||
hdt.setRegionReplication(NB_SERVERS);
|
||||
hdt.addCoprocessor(SlowMeCopro.class.getName());
|
||||
Table table = HTU.createTable(hdt, new byte[][]{f}, null);
|
||||
|
||||
// create hfiles to load.
|
||||
LOG.debug("Creating test data");
|
||||
Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
|
||||
Path dir = HTU.getDataTestDirOnTestFS(name.getMethodName());
|
||||
final int numRows = 10;
|
||||
final byte[] qual = Bytes.toBytes("qual");
|
||||
final byte[] val = Bytes.toBytes("val");
|
||||
|
@ -537,7 +543,7 @@ public class TestReplicaWithCluster {
|
|||
@Test
|
||||
public void testReplicaGetWithPrimaryDown() throws IOException {
|
||||
// Create table then get the single region for our new table.
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
|
||||
hdt.setRegionReplication(NB_SERVERS);
|
||||
hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
|
||||
try {
|
||||
|
@ -571,7 +577,7 @@ public class TestReplicaWithCluster {
|
|||
@Test
|
||||
public void testReplicaScanWithPrimaryDown() throws IOException {
|
||||
// Create table then get the single region for our new table.
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
|
||||
hdt.setRegionReplication(NB_SERVERS);
|
||||
hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
|
||||
|
||||
|
@ -618,7 +624,7 @@ public class TestReplicaWithCluster {
|
|||
HTU.getConfiguration().set(
|
||||
"hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
|
||||
// Create table then get the single region for our new table.
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithAsyncRpcClientImpl");
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
|
||||
hdt.setRegionReplication(NB_SERVERS);
|
||||
hdt.addCoprocessor(SlowMeCopro.class.getName());
|
||||
|
||||
|
@ -669,11 +675,12 @@ public class TestReplicaWithCluster {
|
|||
@Test
|
||||
public void testGetRegionLocationFromPrimaryMetaRegion() throws IOException, InterruptedException {
|
||||
HTU.getAdmin().setBalancerRunning(false, true);
|
||||
|
||||
((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(true);
|
||||
Configuration conf = new Configuration(HTU.getConfiguration());
|
||||
conf.setBoolean(USE_META_REPLICAS, true);
|
||||
Connection conn = ConnectionFactory.createConnection(conf);
|
||||
|
||||
// Create table then get the single region for our new table.
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor("testGetRegionLocationFromPrimaryMetaRegion");
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
|
||||
hdt.setRegionReplication(2);
|
||||
try {
|
||||
|
||||
|
@ -682,12 +689,11 @@ public class TestReplicaWithCluster {
|
|||
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true;
|
||||
|
||||
// Get user table location, always get it from the primary meta replica
|
||||
RegionLocations url = ((ClusterConnection) HTU.getConnection())
|
||||
RegionLocations url = ((ClusterConnection)conn)
|
||||
.locateRegion(hdt.getTableName(), row, false, false);
|
||||
|
||||
} finally {
|
||||
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false;
|
||||
((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
|
||||
HTU.getAdmin().setBalancerRunning(true, true);
|
||||
HTU.getAdmin().disableTable(hdt.getTableName());
|
||||
HTU.deleteTable(hdt.getTableName());
|
||||
|
@ -703,22 +709,25 @@ public class TestReplicaWithCluster {
|
|||
public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
|
||||
HTU.getAdmin().setBalancerRunning(false, true);
|
||||
|
||||
((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(true);
|
||||
Configuration conf = new Configuration(HTU.getConfiguration());
|
||||
conf.setBoolean(USE_META_REPLICAS, true);
|
||||
Connection conn = ConnectionFactory.createConnection(conf);
|
||||
|
||||
// Create table then get the single region for our new table.
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown");
|
||||
HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
|
||||
hdt.setRegionReplication(2);
|
||||
try {
|
||||
|
||||
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
|
||||
HTU.createTable(hdt, new byte[][] { f }, null);
|
||||
Table table = conn.getTable(TableName.valueOf(name.getMethodName()));
|
||||
|
||||
// Get Meta location
|
||||
RegionLocations mrl = ((ClusterConnection) HTU.getConnection())
|
||||
RegionLocations mrl = ((ClusterConnection)conn)
|
||||
.locateRegion(TableName.META_TABLE_NAME,
|
||||
HConstants.EMPTY_START_ROW, false, false);
|
||||
|
||||
// Get user table location
|
||||
RegionLocations url = ((ClusterConnection) HTU.getConnection())
|
||||
RegionLocations url = ((ClusterConnection)conn)
|
||||
.locateRegion(hdt.getTableName(), row, false, false);
|
||||
|
||||
// Make sure that user primary region is co-hosted with the meta region
|
||||
|
@ -738,11 +747,11 @@ public class TestReplicaWithCluster {
|
|||
|
||||
// Wait until the meta table is updated with new location info
|
||||
while (true) {
|
||||
mrl = ((ClusterConnection) HTU.getConnection())
|
||||
mrl = ((ClusterConnection)conn)
|
||||
.locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false);
|
||||
|
||||
// Get user table location
|
||||
url = ((ClusterConnection) HTU.getConnection())
|
||||
url = ((ClusterConnection)conn)
|
||||
.locateRegion(hdt.getTableName(), row, false, true);
|
||||
|
||||
LOG.info("meta locations " + mrl);
|
||||
|
@ -783,9 +792,7 @@ public class TestReplicaWithCluster {
|
|||
// The second Get will succeed as well
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
|
||||
} finally {
|
||||
((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
|
||||
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
|
||||
HTU.getAdmin().setBalancerRunning(true, true);
|
||||
HTU.getAdmin().disableTable(hdt.getTableName());
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
@ -38,6 +40,10 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
|
|||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -62,17 +68,20 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
|
||||
* verifying async wal replication replays the edits to the secondary region in various scenarios.
|
||||
*
|
||||
* @see TestRegionReplicaReplicationEndpoint
|
||||
*/
|
||||
@Category({LargeTests.class})
|
||||
public class TestMetaRegionReplicaReplicationEndpoint {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
|
||||
HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
|
||||
private static final int NB_SERVERS = 3;
|
||||
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
|
||||
LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
|
||||
private static final int NB_SERVERS = 4;
|
||||
private final HBaseTestingUtility HTU = new HBaseTestingUtility();
|
||||
private int numOfMetaReplica = NB_SERVERS - 1;
|
||||
private static byte[] VALUE = Bytes.toBytes("value");
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
@ -92,12 +101,17 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
|
||||
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
|
||||
// Enable hbase:meta replication.
|
||||
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
|
||||
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
|
||||
true);
|
||||
// Set hbase:meta replicas to be 3.
|
||||
conf.setInt(HConstants.META_REPLICAS_NUM, NB_SERVERS);
|
||||
// conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
|
||||
HTU.startMiniCluster(NB_SERVERS);
|
||||
// Enable hbase:meta replication.
|
||||
HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
|
||||
|
||||
HTU.waitFor(30000,
|
||||
() -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= NB_SERVERS);
|
||||
() -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
|
||||
>= numOfMetaReplica);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -128,7 +142,7 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
assertNotNull(hrsOther);
|
||||
assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
|
||||
Region meta = null;
|
||||
for (Region region: hrs.getOnlineRegionsLocalContext()) {
|
||||
for (Region region : hrs.getOnlineRegionsLocalContext()) {
|
||||
if (region.getRegionInfo().isMetaRegion()) {
|
||||
meta = region;
|
||||
break;
|
||||
|
@ -156,8 +170,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
private void testHBaseMetaReplicatesOneRow(int i) throws Exception {
|
||||
waitForMetaReplicasToOnline();
|
||||
try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i),
|
||||
HConstants.CATALOG_FAMILY)) {
|
||||
verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
|
||||
HConstants.CATALOG_FAMILY)) {
|
||||
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -175,18 +189,136 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
*/
|
||||
@Test
|
||||
public void testHBaseMetaReplicates() throws Exception {
|
||||
try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
|
||||
HConstants.CATALOG_FAMILY,
|
||||
Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
|
||||
verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
|
||||
try (Table table = HTU
|
||||
.createTable(TableName.valueOf(this.name.getMethodName() + "_0"), HConstants.CATALOG_FAMILY,
|
||||
Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
|
||||
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
|
||||
}
|
||||
try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
|
||||
HConstants.CATALOG_FAMILY,
|
||||
Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
|
||||
verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
|
||||
try (Table table = HTU
|
||||
.createTable(TableName.valueOf(this.name.getMethodName() + "_1"), HConstants.CATALOG_FAMILY,
|
||||
Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
|
||||
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
|
||||
// Try delete.
|
||||
HTU.deleteTableIfAny(table.getName());
|
||||
verifyDeletedReplication(TableName.META_TABLE_NAME, NB_SERVERS, table.getName());
|
||||
verifyDeletedReplication(TableName.META_TABLE_NAME, numOfMetaReplica, table.getName());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
|
||||
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
|
||||
TableName tableName = TableName.valueOf("hbase:meta");
|
||||
Table table = connection.getTable(tableName);
|
||||
try {
|
||||
// load the data to the table
|
||||
for (int i = 0; i < 5; i++) {
|
||||
LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
|
||||
HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
|
||||
LOG.info("flushing table");
|
||||
HTU.flush(tableName);
|
||||
LOG.info("compacting table");
|
||||
if (i < 4) {
|
||||
HTU.compact(tableName, false);
|
||||
}
|
||||
}
|
||||
|
||||
verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
|
||||
} finally {
|
||||
table.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
|
||||
MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();
|
||||
HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
|
||||
|
||||
HRegionServer hrsMetaReplica = null;
|
||||
HRegionServer hrsNoMetaReplica = null;
|
||||
HRegionServer server = null;
|
||||
Region metaReplica = null;
|
||||
boolean hostingMeta;
|
||||
|
||||
for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
|
||||
server = cluster.getRegionServer(i);
|
||||
hostingMeta = false;
|
||||
if (server == hrs) {
|
||||
continue;
|
||||
}
|
||||
for (Region region : server.getOnlineRegionsLocalContext()) {
|
||||
if (region.getRegionInfo().isMetaRegion()) {
|
||||
if (metaReplica == null) {
|
||||
metaReplica = region;
|
||||
}
|
||||
hostingMeta = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!hostingMeta) {
|
||||
hrsNoMetaReplica = server;
|
||||
}
|
||||
}
|
||||
|
||||
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
|
||||
TableName tableName = TableName.valueOf("hbase:meta");
|
||||
Table table = connection.getTable(tableName);
|
||||
try {
|
||||
// load the data to the table
|
||||
for (int i = 0; i < 5; i++) {
|
||||
LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
|
||||
HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
|
||||
if (i == 0) {
|
||||
HTU.moveRegionAndWait(metaReplica.getRegionInfo(), hrsNoMetaReplica.getServerName());
|
||||
}
|
||||
}
|
||||
|
||||
verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
|
||||
} finally {
|
||||
table.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected void verifyReplication(TableName tableName, int regionReplication, final int startRow,
|
||||
final int endRow, final byte[] family) throws Exception {
|
||||
verifyReplication(tableName, regionReplication, startRow, endRow, family, true);
|
||||
}
|
||||
|
||||
private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
|
||||
final int endRow, final byte[] family, final boolean present) throws Exception {
|
||||
// find the regions
|
||||
final Region[] regions = new Region[regionReplication];
|
||||
|
||||
for (int i = 0; i < NB_SERVERS; i++) {
|
||||
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
|
||||
List<HRegion> onlineRegions = rs.getRegions(tableName);
|
||||
for (HRegion region : onlineRegions) {
|
||||
regions[region.getRegionInfo().getReplicaId()] = region;
|
||||
}
|
||||
}
|
||||
|
||||
for (Region region : regions) {
|
||||
assertNotNull(region);
|
||||
}
|
||||
|
||||
for (int i = 1; i < regionReplication; i++) {
|
||||
final Region region = regions[i];
|
||||
// wait until all the data is replicated to all secondary regions
|
||||
Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
LOG.info("verifying replication for region replica:" + region.getRegionInfo());
|
||||
try {
|
||||
HTU.verifyNumericRows(region, family, startRow, endRow, present);
|
||||
} catch (Throwable ex) {
|
||||
LOG.warn("Verification from secondary region is not complete yet", ex);
|
||||
// still wait
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -200,10 +332,10 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
// getRegionLocations returns an entry for each replica but if unassigned, entry is null.
|
||||
// Pass reload to force us to skip cache else it just keeps returning default.
|
||||
() -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
|
||||
filter(Objects::nonNull).count() >= NB_SERVERS);
|
||||
filter(Objects::nonNull).count() >= numOfMetaReplica);
|
||||
List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
|
||||
LOG.info("Found locations {}", locations);
|
||||
assertEquals(NB_SERVERS, locations.size());
|
||||
assertEquals(numOfMetaReplica, locations.size());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -224,7 +356,7 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
/**
|
||||
* @return All Regions for tableName including Replicas.
|
||||
*/
|
||||
private Region [] getAllRegions(TableName tableName, int replication) {
|
||||
private Region[] getAllRegions(TableName tableName, int replication) {
|
||||
final Region[] regions = new Region[replication];
|
||||
for (int i = 0; i < NB_SERVERS; i++) {
|
||||
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
|
||||
|
@ -239,12 +371,23 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
return regions;
|
||||
}
|
||||
|
||||
private Region getOneRegion(TableName tableName) {
|
||||
for (int i = 0; i < NB_SERVERS; i++) {
|
||||
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
|
||||
List<HRegion> onlineRegions = rs.getRegions(tableName);
|
||||
if (onlineRegions.size() > 1) {
|
||||
return onlineRegions.get(0);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify when a Table is deleted from primary, then there are no references in replicas
|
||||
* (because they get the delete of the table rows too).
|
||||
*/
|
||||
private void verifyDeletedReplication(TableName tableName, int regionReplication,
|
||||
final TableName deletedTableName) {
|
||||
final TableName deletedTableName) {
|
||||
final Region[] regions = getAllRegions(tableName, regionReplication);
|
||||
|
||||
// Start count at '1' so we skip default, primary replica and only look at secondaries.
|
||||
|
@ -261,7 +404,7 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
continue;
|
||||
}
|
||||
return doesNotContain(cells, deletedTableName);
|
||||
} catch(Throwable ex) {
|
||||
} catch (Throwable ex) {
|
||||
LOG.warn("Verification from secondary region is not complete yet", ex);
|
||||
// still wait
|
||||
return false;
|
||||
|
@ -277,7 +420,7 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
* <code>cells</code>.
|
||||
*/
|
||||
private boolean doesNotContain(List<Cell> cells, TableName tableName) {
|
||||
for (Cell cell: cells) {
|
||||
for (Cell cell : cells) {
|
||||
String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||
if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) {
|
||||
return false;
|
||||
|
@ -290,7 +433,7 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
* Verify Replicas have results (exactly).
|
||||
*/
|
||||
private void verifyReplication(TableName tableName, int regionReplication,
|
||||
List<Result> contains) {
|
||||
List<Result> contains) {
|
||||
final Region[] regions = getAllRegions(tableName, regionReplication);
|
||||
|
||||
// Start count at '1' so we skip default, primary replica and only look at secondaries.
|
||||
|
@ -307,7 +450,7 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
continue;
|
||||
}
|
||||
return contains(contains, cells);
|
||||
} catch(Throwable ex) {
|
||||
} catch (Throwable ex) {
|
||||
LOG.warn("Verification from secondary region is not complete yet", ex);
|
||||
// still wait
|
||||
return false;
|
||||
|
@ -337,4 +480,144 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||
}
|
||||
return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size();
|
||||
}
|
||||
|
||||
private void doNGets(final Table table, final byte[][] keys) throws Exception {
|
||||
for (byte[] key : keys) {
|
||||
Result r = table.get(new Get(key));
|
||||
assertArrayEquals(VALUE, r.getValue(HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY));
|
||||
}
|
||||
}
|
||||
|
||||
private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) {
|
||||
assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID],
|
||||
after[RegionInfo.DEFAULT_REPLICA_ID]);
|
||||
|
||||
for (int i = 1; i < after.length; i ++) {
|
||||
assertTrue(after[i] > before[i]);
|
||||
}
|
||||
}
|
||||
|
||||
private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
|
||||
// There are read requests increase for primary meta replica.
|
||||
assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >
|
||||
before[RegionInfo.DEFAULT_REPLICA_ID]);
|
||||
|
||||
// No change for replica regions
|
||||
for (int i = 1; i < after.length; i ++) {
|
||||
assertEquals(before[i], after[i]);
|
||||
}
|
||||
}
|
||||
|
||||
private void primaryMayIncreaseReplicaNoChange(final long[] before, final long[] after) {
|
||||
// For primary meta replica, scan request may increase. No change for replica meta regions.
|
||||
assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >=
|
||||
before[RegionInfo.DEFAULT_REPLICA_ID]);
|
||||
|
||||
// No change for replica regions
|
||||
for (int i = 1; i < after.length; i ++) {
|
||||
assertEquals(before[i], after[i]);
|
||||
}
|
||||
}
|
||||
|
||||
private void getMetaReplicaReadRequests(final Region[] metaRegions, final long[] counters) {
|
||||
int i = 0;
|
||||
for (Region r : metaRegions) {
|
||||
LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
|
||||
counters[i] = r.getReadRequestsCount();
|
||||
i ++;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHBaseMetaReplicaGets() throws Exception {
|
||||
|
||||
TableName tn = TableName.valueOf(this.name.getMethodName());
|
||||
final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
|
||||
long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
|
||||
long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
|
||||
long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
|
||||
long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
|
||||
long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
|
||||
Region userRegion = null;
|
||||
HRegionServer srcRs = null;
|
||||
HRegionServer destRs = null;
|
||||
|
||||
try (Table table = HTU.createTable(tn, HConstants.CATALOG_FAMILY,
|
||||
Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
|
||||
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
|
||||
// load different values
|
||||
HTU.loadTable(table, new byte[][] { HConstants.CATALOG_FAMILY }, VALUE);
|
||||
for (int i = 0; i < NB_SERVERS; i++) {
|
||||
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
|
||||
List<HRegion> onlineRegions = rs.getRegions(tn);
|
||||
if (onlineRegions.size() > 0) {
|
||||
userRegion = onlineRegions.get(0);
|
||||
srcRs = rs;
|
||||
if (i > 0) {
|
||||
destRs = HTU.getMiniHBaseCluster().getRegionServer(0);
|
||||
} else {
|
||||
destRs = HTU.getMiniHBaseCluster().getRegionServer(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicas);
|
||||
|
||||
Configuration c = new Configuration(HTU.getConfiguration());
|
||||
c.set(LOCATOR_META_REPLICAS_MODE, "LoadBalance");
|
||||
Connection connection = ConnectionFactory.createConnection(c);
|
||||
Table tableForGet = connection.getTable(tn);
|
||||
byte[][] getRows = new byte[HBaseTestingUtility.KEYS.length][];
|
||||
|
||||
int i = 0;
|
||||
for (byte[] key : HBaseTestingUtility.KEYS) {
|
||||
getRows[i] = key;
|
||||
i++;
|
||||
}
|
||||
getRows[0] = Bytes.toBytes("aaa");
|
||||
doNGets(tableForGet, getRows);
|
||||
|
||||
getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGet);
|
||||
|
||||
// There is no read requests increase for primary meta replica.
|
||||
// For rest of meta replicas, there are more reads against them.
|
||||
primaryNoChangeReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);
|
||||
|
||||
// move one of regions so it meta cache may be invalid.
|
||||
HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());
|
||||
|
||||
doNGets(tableForGet, getRows);
|
||||
|
||||
getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterMove);
|
||||
|
||||
// There are read requests increase for primary meta replica.
|
||||
// For rest of meta replicas, there is no change as regionMove will tell the new location
|
||||
primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGet,
|
||||
readReqsForMetaReplicasAfterMove);
|
||||
// Move region again.
|
||||
HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
|
||||
|
||||
// Wait until moveRegion cache timeout.
|
||||
while (destRs.getMovedRegion(userRegion.getRegionInfo().getEncodedName()) != null) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterSecondMove);
|
||||
|
||||
// There may be read requests increase for primary meta replica.
|
||||
// For rest of meta replicas, there is no change.
|
||||
primaryMayIncreaseReplicaNoChange(readReqsForMetaReplicasAfterMove,
|
||||
readReqsForMetaReplicasAfterSecondMove);
|
||||
|
||||
doNGets(tableForGet, getRows);
|
||||
|
||||
getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterThirdGet);
|
||||
|
||||
// Since it gets RegionNotServedException, it will go to primary for the next lookup.
|
||||
// There are read requests increase for primary meta replica.
|
||||
// For rest of meta replicas, there is no change.
|
||||
primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterSecondMove,
|
||||
readReqsForMetaReplicasAfterThirdGet);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -536,7 +536,6 @@ public class TestReplicationSource {
|
|||
try {
|
||||
rs.startup();
|
||||
assertTrue(rs.isSourceActive());
|
||||
Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0);
|
||||
Waiter.waitFor(conf, 1000, () -> rss.isAborted());
|
||||
assertTrue(rss.isAborted());
|
||||
Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
|
||||
|
|
Loading…
Reference in New Issue