From 690b4d8973a3cb1c5aab706d67c377b4bd5068fd Mon Sep 17 00:00:00 2001 From: Huaxiang Sun Date: Mon, 2 Nov 2020 09:17:16 -0800 Subject: [PATCH] HBASE-25126 Add load balance logic in hbase-client to distribute read load over meta replica regions It adds load balance support for meta lookup in AsyncTableRegionLocator. The existing meta replica mode is renamed as "HedgedRead", client sends scan request to the primary meta replica region first, if response is not back within a configured amount of time, it will send scan requests to all meta replica regions and take the first response. On top of the existing mode, a new mode "LoadBalance" is introduced. In this mode, client first choose a meta replica region to send scan request. If the response is stale, it may send the request to another meta replica region or the primary region. In this mode, meta scan requests are load balanced across all replica regions with the primary mode as the ultimate source of truth. Two new config knobs are added: 1. hbase.locator.meta.replicas.mode Valid options are "None", "HedgedRead" and "LoadBalance", they are case insensitive. The default mode is "None". 2. hbase.locator.meta.replicas.mode.loadbalance.selector The load balance alogrithm to select a meta replica to send the requests. Only org.apache.hadoop.hbase.client.CatalogReplicaLoadBalanceReplicaSimpleSelector.class is supported for now, which is the default as well. The algorithm works as follows: a. Clients select a randome meta replica region to send the requests if there is no entry for the location in the stale location cache. b. If the location from one meta replica region is stale, a stale entry will be created in the statle location cache for the region. c. Clients select the primary meta region if the location is in the stale location cache. d. The stale location cache entries time out in 3 seconds. If there is no "hbase.locator.meta.replicas.mode" configured, it will check the config knob "hbase.meta.replicas.use". If "hbase.meta.replicas.use" is configured, the mode will be set to "HedgedRead". --- .../hbase/client/AsyncConnectionImpl.java | 22 +- .../client/AsyncNonMetaRegionLocator.java | 70 +++- .../CatalogReplicaLoadBalanceSelector.java | 47 +++ ...alogReplicaLoadBalanceSelectorFactory.java | 49 +++ ...talogReplicaLoadBalanceSimpleSelector.java | 301 ++++++++++++++++ .../hbase/client/CatalogReplicaMode.java | 64 ++++ .../hadoop/hbase/client/RegionLocator.java | 12 + .../RegionReplicaReplicationEndpoint.java | 11 +- .../hbase/client/RegionReplicaTestHelper.java | 15 + .../client/TestAsyncNonMetaRegionLocator.java | 76 +++- ...tMetaRegionReplicaReplicationEndpoint.java | 330 ++++++++++++++++-- .../regionserver/TestReplicationSource.java | 1 - 12 files changed, 949 insertions(+), 49 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelector.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelectorFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSimpleSelector.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaMode.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 406af0d4fdd..fda262cfca3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -116,7 +116,7 @@ class AsyncConnectionImpl implements AsyncConnection { private final Optional stats; private final ClientBackoffPolicy backoffPolicy; - private ChoreService authService; + private ChoreService choreService; private final AtomicBoolean closed = new AtomicBoolean(false); @@ -130,6 +130,7 @@ class AsyncConnectionImpl implements AsyncConnection { SocketAddress localAddress, User user) { this.conf = conf; this.user = user; + if (user.isLoginFromKeytab()) { spawnRenewalChore(user.getUGI()); } @@ -182,8 +183,19 @@ class AsyncConnectionImpl implements AsyncConnection { } private void spawnRenewalChore(final UserGroupInformation user) { - authService = new ChoreService("Relogin service"); - authService.scheduleChore(AuthUtil.getAuthRenewalChore(user)); + ChoreService service = getChoreService(); + service.scheduleChore(AuthUtil.getAuthRenewalChore(user)); + } + + /** + * If choreService has not been created yet, create the ChoreService. + * @return ChoreService + */ + synchronized ChoreService getChoreService() { + if (choreService == null) { + choreService = new ChoreService("AsyncConn Chore Service"); + } + return choreService; } @Override @@ -208,8 +220,8 @@ class AsyncConnectionImpl implements AsyncConnection { IOUtils.closeQuietly(clusterStatusListener); IOUtils.closeQuietly(rpcClient); IOUtils.closeQuietly(registry); - if (authService != null) { - authService.shutdown(); + if (choreService != null) { + choreService.shutdown(); } metrics.ifPresent(MetricsConnection::shutdown); ConnectionOverAsyncConnection c = this.conn; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index b2021688816..a9ee6a9e552 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -30,6 +30,7 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegi import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName; +import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; @@ -46,6 +47,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.ObjectUtils; import org.apache.hadoop.hbase.CatalogFamilyFormat; import org.apache.hadoop.hbase.HBaseIOException; @@ -89,7 +91,10 @@ class AsyncNonMetaRegionLocator { private final int locatePrefetchLimit; - private final boolean useMetaReplicas; + // The mode tells if HedgedRead, LoadBalance mode is supported. + // The default mode is CatalogReplicaMode.None. + private CatalogReplicaMode metaReplicaMode; + private CatalogReplicaLoadBalanceSelector metaReplicaSelector; private final ConcurrentMap cache = new ConcurrentHashMap<>(); @@ -196,8 +201,41 @@ class AsyncNonMetaRegionLocator { MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); this.locatePrefetchLimit = conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT); - this.useMetaReplicas = - conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS); + + // Get the region locator's meta replica mode. + this.metaReplicaMode = CatalogReplicaMode.fromString(conn.getConfiguration() + .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString())); + + switch (this.metaReplicaMode) { + case LOAD_BALANCE: + String replicaSelectorClass = conn.getConfiguration(). + get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR, + CatalogReplicaLoadBalanceSimpleSelector.class.getName()); + + this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector( + replicaSelectorClass, META_TABLE_NAME, conn, () -> { + int numOfReplicas = 1; + try { + RegionLocations metaLocations = conn.registry.getMetaRegionLocations().get( + conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS); + numOfReplicas = metaLocations.size(); + } catch (Exception e) { + LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e); + } + return numOfReplicas; + }); + break; + case NONE: + // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config. + boolean useMetaReplicas = conn.getConfiguration().getBoolean(USE_META_REPLICAS, + DEFAULT_USE_META_REPLICAS); + if (useMetaReplicas) { + this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ; + } + break; + default: + // Doing nothing + } } private TableCache getTableCache(TableName tableName) { @@ -433,9 +471,24 @@ class AsyncNonMetaRegionLocator { Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit) .setReadType(ReadType.PREAD); - if (useMetaReplicas) { - scan.setConsistency(Consistency.TIMELINE); + + switch (this.metaReplicaMode) { + case LOAD_BALANCE: + int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType); + if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) { + // If the selector gives a non-primary meta replica region, then go with it. + // Otherwise, just go to primary in non-hedgedRead mode. + scan.setConsistency(Consistency.TIMELINE); + scan.setReplicaId(metaReplicaId); + } + break; + case HEDGED_READ: + scan.setConsistency(Consistency.TIMELINE); + break; + default: + // do nothing } + conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() { private boolean completeNormally = false; @@ -577,6 +630,13 @@ class AsyncNonMetaRegionLocator { if (!canUpdateOnError(loc, oldLoc)) { return; } + // Tell metaReplicaSelector that the location is stale. It will create a stale entry + // with timestamp internally. Next time the client looks up the same location, + // it will pick a different meta replica region. + if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { + metaReplicaSelector.onError(loc); + } + RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); if (newLocs == null) { if (tableCache.cache.remove(startKey, oldLocs)) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelector.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelector.java new file mode 100644 index 00000000000..f9572b3b63c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelector.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A Catalog replica selector decides which catalog replica to go for read requests when it is + * configured as CatalogReplicaMode.LoadBalance. + */ +@InterfaceAudience.Private +interface CatalogReplicaLoadBalanceSelector { + + /** + * This method is called when input location is stale, i.e, when clients run into + * org.apache.hadoop.hbase.NotServingRegionException. + * @param loc stale location + */ + void onError(HRegionLocation loc); + + /** + * Select a catalog replica region where client go to loop up the input row key. + * + * @param tablename table name + * @param row key to look up + * @param locateType locate type + * @return replica id + */ + int select(TableName tablename, byte[] row, RegionLocateType locateType); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelectorFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelectorFactory.java new file mode 100644 index 00000000000..1570e7f8a06 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelectorFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.function.IntSupplier; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Factory to create a {@link CatalogReplicaLoadBalanceSelector} + */ +@InterfaceAudience.Private +final class CatalogReplicaLoadBalanceSelectorFactory { + /** + * Private Constructor + */ + private CatalogReplicaLoadBalanceSelectorFactory() { + } + + /** + * Create a CatalogReplicaLoadBalanceReplicaSelector based on input config. + * @param replicaSelectorClass Selector classname. + * @param tableName System table name. + * @param conn {@link AsyncConnectionImpl} + * @return {@link CatalogReplicaLoadBalanceSelector} + */ + public static CatalogReplicaLoadBalanceSelector createSelector(String replicaSelectorClass, + TableName tableName, AsyncConnectionImpl conn, IntSupplier getReplicaCount) { + return ReflectionUtils.instantiateWithCustomCtor(replicaSelectorClass, + new Class[] { TableName.class, AsyncConnectionImpl.class, IntSupplier.class }, + new Object[] { tableName, conn, getReplicaCount }); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSimpleSelector.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSimpleSelector.java new file mode 100644 index 00000000000..ccd74120020 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSimpleSelector.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; +import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; +import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.IntSupplier; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +/** + *

CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load + * balancing algorithm. It maintains a stale location cache for each table. Whenever client looks + * up location, it first check if the row is the stale location cache. If yes, the location from + * catalog replica is stale, it will go to the primary region to look up update-to-date location; + * otherwise, it will randomly pick up a replica region for lookup. When clients receive + * RegionNotServedException from region servers, it will add these region locations to the stale + * location cache. The stale cache will be cleaned up periodically by a chore.

+ * + * It follows a simple algorithm to choose a replica to go: + * + *
    + *
  1. If there is no stale location entry for rows it looks up, it will randomly + * pick a replica region to do lookup.
  2. + *
  3. If the location from the replica region is stale, client gets RegionNotServedException + * from region server, in this case, it will create StaleLocationCacheEntry in + * CatalogReplicaLoadBalanceReplicaSimpleSelector.
  4. + *
  5. When client tries to do location lookup, it checks StaleLocationCache first for rows it + * tries to lookup, if entry exists, it will go with primary meta region to do lookup; + * otherwise, it will follow step 1.
  6. + *
  7. A chore will periodically run to clean up cache entries in the StaleLocationCache.
  8. + *
+ */ +class CatalogReplicaLoadBalanceSimpleSelector implements + CatalogReplicaLoadBalanceSelector, Stoppable { + private static final Logger LOG = + LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class); + private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds + private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds + private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute + + /** + * StaleLocationCacheEntry is the entry when a stale location is reported by an client. + */ + private static final class StaleLocationCacheEntry { + // timestamp in milliseconds + private final long timestamp; + + private final byte[] endKey; + + StaleLocationCacheEntry(final byte[] endKey) { + this.endKey = endKey; + timestamp = EnvironmentEdgeManager.currentTime(); + } + + public byte[] getEndKey() { + return this.endKey; + } + + public long getTimestamp() { + return this.timestamp; + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("endKey", endKey) + .append("timestamp", timestamp) + .toString(); + } + } + + private final ConcurrentMap> + staleCache = new ConcurrentHashMap<>(); + private volatile int numOfReplicas; + private final AsyncConnectionImpl conn; + private final TableName tableName; + private final IntSupplier getNumOfReplicas; + private volatile boolean isStopped = false; + private final static int UNINITIALIZED_NUM_OF_REPLICAS = -1; + + CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, AsyncConnectionImpl conn, + IntSupplier getNumOfReplicas) { + this.conn = conn; + this.tableName = tableName; + this.getNumOfReplicas = getNumOfReplicas; + + // This numOfReplicas is going to be lazy initialized. + this.numOfReplicas = UNINITIALIZED_NUM_OF_REPLICAS; + // Start chores + this.conn.getChoreService().scheduleChore(getCacheCleanupChore(this)); + this.conn.getChoreService().scheduleChore(getRefreshReplicaCountChore(this)); + } + + /** + * When a client runs into RegionNotServingException, it will call this method to + * update Selector's internal state. + * @param loc the location which causes exception. + */ + public void onError(HRegionLocation loc) { + ConcurrentNavigableMap tableCache = + computeIfAbsent(staleCache, loc.getRegion().getTable(), + () -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR)); + byte[] startKey = loc.getRegion().getStartKey(); + tableCache.putIfAbsent(startKey, + new StaleLocationCacheEntry(loc.getRegion().getEndKey())); + LOG.debug("Add entry to stale cache for table {} with startKey {}, {}", + loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey()); + } + + /** + * Select an random replica id. In case there is no replica region configured, return + * the primary replica id. + * @return Replica id + */ + private int getRandomReplicaId() { + int cachedNumOfReplicas = this.numOfReplicas; + if (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) { + cachedNumOfReplicas = refreshCatalogReplicaCount(); + this.numOfReplicas = cachedNumOfReplicas; + } + // In case of no replica configured, return the primary region id. + if (cachedNumOfReplicas <= 1) { + return RegionInfo.DEFAULT_REPLICA_ID; + } + return 1 + ThreadLocalRandom.current().nextInt(cachedNumOfReplicas - 1); + } + + /** + * When it looks up a location, it will call this method to find a replica region to go. + * For a normal case, > 99% of region locations from catalog/meta replica will be up to date. + * In extreme cases such as region server crashes, it will depends on how fast replication + * catches up. + * + * @param tablename table name it looks up + * @param row key it looks up. + * @param locateType locateType, Only BEFORE and CURRENT will be passed in. + * @return catalog replica id + */ + public int select(final TableName tablename, final byte[] row, + final RegionLocateType locateType) { + Preconditions.checkArgument(locateType == RegionLocateType.BEFORE || + locateType == RegionLocateType.CURRENT, + "Expected type BEFORE or CURRENT but got: %s", locateType); + + ConcurrentNavigableMap tableCache = staleCache.get(tablename); + + // If there is no entry in StaleCache, select a random replica id. + if (tableCache == null) { + return getRandomReplicaId(); + } + + Map.Entry entry; + boolean isEmptyStopRow = isEmptyStopRow(row); + // Only BEFORE and CURRENT are passed in. + if (locateType == RegionLocateType.BEFORE) { + entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row); + } else { + entry = tableCache.floorEntry(row); + } + + // It is not in the stale cache, return a random replica id. + if (entry == null) { + return getRandomReplicaId(); + } + + // The entry here is a possible match for the location. Check if the entry times out first as + // long comparing is faster than comparing byte arrays(in most cases). It could remove + // stale entries faster. If the possible match entry does not time out, it will check if + // the entry is a match for the row passed in and select the replica id accordingly. + if ((EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp()) >= + STALE_CACHE_TIMEOUT_IN_MILLISECONDS) { + LOG.debug("Entry for table {} with startKey {}, {} times out", tablename, entry.getKey(), + entry); + tableCache.remove(entry.getKey()); + return getRandomReplicaId(); + } + + byte[] endKey = entry.getValue().getEndKey(); + + // The following logic is borrowed from AsyncNonMetaRegionLocator. + if (isEmptyStopRow(endKey)) { + LOG.debug("Lookup {} goes to primary region", row); + return RegionInfo.DEFAULT_REPLICA_ID; + } + + if (locateType == RegionLocateType.BEFORE) { + if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) { + LOG.debug("Lookup {} goes to primary meta", row); + return RegionInfo.DEFAULT_REPLICA_ID; + } + } else { + if (Bytes.compareTo(row, endKey) < 0) { + LOG.debug("Lookup {} goes to primary meta", row); + return RegionInfo.DEFAULT_REPLICA_ID; + } + } + + // Not in stale cache, return a random replica id. + return getRandomReplicaId(); + } + + // This class implements the Stoppable interface as chores needs a Stopable object, there is + // no-op on this Stoppable object currently. + @Override + public void stop(String why) { + isStopped = true; + } + + @Override + public boolean isStopped() { + return isStopped; + } + + private void cleanupReplicaReplicaStaleCache() { + long curTimeInMills = EnvironmentEdgeManager.currentTime(); + for (ConcurrentNavigableMap tableCache : staleCache.values()) { + Iterator> it = + tableCache.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + if (curTimeInMills - entry.getValue().getTimestamp() >= + STALE_CACHE_TIMEOUT_IN_MILLISECONDS) { + LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue()); + it.remove(); + } + } + } + } + + private int refreshCatalogReplicaCount() { + int newNumOfReplicas = this.getNumOfReplicas.getAsInt(); + LOG.debug("Refreshed replica count {}", newNumOfReplicas); + if (newNumOfReplicas == 1) { + LOG.warn("Table {}'s region replica count is 1, maybe a misconfiguration or failure to " + + "fetch the replica count", tableName); + } + int cachedNumOfReplicas = this.numOfReplicas; + + // If the returned number of replicas is 1, it is mostly caused by failure to fetch the + // replica count. Do not update the numOfReplicas in this case. + if ((cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) || + ((cachedNumOfReplicas != newNumOfReplicas) && (newNumOfReplicas != 1))) { + this.numOfReplicas = newNumOfReplicas; + } + return newNumOfReplicas; + } + + private ScheduledChore getCacheCleanupChore( + final CatalogReplicaLoadBalanceSimpleSelector selector) { + return new ScheduledChore("CleanupCatalogReplicaStaleCache", this, + STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) { + @Override + protected void chore() { + selector.cleanupReplicaReplicaStaleCache(); + } + }; + } + + private ScheduledChore getRefreshReplicaCountChore( + final CatalogReplicaLoadBalanceSimpleSelector selector) { + return new ScheduledChore("RefreshReplicaCountChore", this, + REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) { + @Override + protected void chore() { + selector.refreshCatalogReplicaCount(); + } + }; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaMode.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaMode.java new file mode 100644 index 00000000000..0f126e1139c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaMode.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + *

There are two modes with catalog replica support.

+ * + *
    + *
  1. HEDGED_READ - Client sends requests to the primary region first, within a + * configured amount of time, if there is no response coming back, + * client sends requests to all replica regions and takes the first + * response.
  2. + * + *
  3. LOAD_BALANCE - Client sends requests to replica regions in a round-robin mode, + * if results from replica regions are stale, next time, client sends requests for + * these stale locations to the primary region. In this mode, scan + * requests are load balanced across all replica regions.
  4. + *
+ */ +@InterfaceAudience.Private +enum CatalogReplicaMode { + NONE { + @Override + public String toString() { + return "None"; + } + }, + HEDGED_READ { + @Override + public String toString() { + return "HedgedRead"; + } + }, + LOAD_BALANCE { + @Override + public String toString() { + return "LoadBalance"; + } + }; + + public static CatalogReplicaMode fromString(final String value) { + for(CatalogReplicaMode mode : values()) { + if (mode.toString().equalsIgnoreCase(value)) { + return mode; + } + } + throw new IllegalArgumentException(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java index c7440c67040..7ea6e4ada36 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java @@ -37,6 +37,18 @@ import org.apache.hadoop.hbase.util.Pair; */ @InterfaceAudience.Public public interface RegionLocator extends Closeable { + + /** Configuration for Region Locator's mode when meta replica is configured. + * Valid values are: HedgedRead, LoadBalance, None + */ + String LOCATOR_META_REPLICAS_MODE = "hbase.locator.meta.replicas.mode"; + + /** Configuration for meta replica selector when Region Locator's LoadBalance mode is configured. + * The default value is org.apache.hadoop.hbase.client.CatalogReplicaLoadBalanceSimpleSelector. + */ + String LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR = + "hbase.locator.meta.replicas.mode.loadbalance.selector"; + /** * Finds the region on which the given row is being served. Does not reload the cache. * @param row Row to find. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 70a9280e553..f8eca6cc1c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -182,7 +182,16 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { private void replicate(CompletableFuture future, RegionLocations locs, TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List entries) { if (locs.size() == 1) { - // Could this happen? + LOG.info("Only one location for {}.{}, refresh the location cache only for meta now", + tableDesc.getTableName(), Bytes.toString(encodedRegionName)); + + // This could happen to meta table. In case of meta table comes with no replica and + // later it is changed to multiple replicas. The cached location for meta may only has + // the primary region. In this case, it needs to clean up and refresh the cached meta + // locations. + if (tableDesc.isMergeEnabled()) { + connection.getRegionLocator(tableDesc.getTableName()).clearRegionLocationCache(); + } future.complete(Long.valueOf(entries.size())); return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java index a2466a5cd7f..989fdbb51ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java @@ -142,6 +142,21 @@ public final class RegionReplicaTestHelper { locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false) .getDefaultRegionLocation().getServerName()); // should get the new location when reload = true + // when meta replica LoadBalance mode is enabled, it may delay a bit. + util.waitFor(3000, new ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + ServerName sn = locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, + true).getDefaultRegionLocation().getServerName(); + return newServerName.equals(sn); + } + + @Override + public String explainFailure() throws Exception { + return "New location does not show up in meta (replica) region"; + } + }); + assertEquals(newServerName, locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true) .getDefaultRegionLocation().getServerName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index d8388de380a..b147d9120f6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -29,12 +29,14 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionLocation; @@ -49,41 +51,63 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Category({ MediumTests.class, ClientTests.class }) +@RunWith(Parameterized.class) public class TestAsyncNonMetaRegionLocator { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class); + private static final Logger LOG = LoggerFactory.getLogger(TestAsyncNonMetaRegionLocator.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static TableName TABLE_NAME = TableName.valueOf("async"); private static byte[] FAMILY = Bytes.toBytes("cf"); + private static final int META_STOREFILE_REFRESH_PERIOD = 100; + private static final int NB_SERVERS = 4; + private static int numOfMetaReplica = NB_SERVERS - 1; private static AsyncConnectionImpl CONN; private static AsyncNonMetaRegionLocator LOCATOR; + private static ConnectionRegistry registry; private static byte[][] SPLIT_KEYS; + private CatalogReplicaMode metaReplicaMode; @BeforeClass public static void setUp() throws Exception { - TEST_UTIL.startMiniCluster(3); - TEST_UTIL.getAdmin().balancerSwitch(false, true); - ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); - CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, - registry.getClusterId().get(), null, User.getCurrent()); - LOCATOR = new AsyncNonMetaRegionLocator(CONN); + Configuration conf = TEST_UTIL.getConfiguration(); + // Enable hbase:meta replication. + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true); + conf.setLong("replication.source.sleepforretries", 10); // 10 ms + + TEST_UTIL.startMiniCluster(NB_SERVERS); + Admin admin = TEST_UTIL.getAdmin(); + admin.balancerSwitch(false, true); + + // Enable hbase:meta replication. + HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, numOfMetaReplica); + TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster().getRegions( + TableName.META_TABLE_NAME).size() >= numOfMetaReplica); + + registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); @@ -108,6 +132,27 @@ public class TestAsyncNonMetaRegionLocator { LOCATOR.clearCache(TABLE_NAME); } + @Parameterized.Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { + { null }, + { CatalogReplicaMode.LOAD_BALANCE.toString() } + }); + } + + public TestAsyncNonMetaRegionLocator(String clientMetaReplicaMode) throws Exception { + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + // Enable meta replica LoadBalance mode for this connection. + if (clientMetaReplicaMode != null) { + c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, clientMetaReplicaMode); + metaReplicaMode = CatalogReplicaMode.fromString(clientMetaReplicaMode); + } + + CONN = new AsyncConnectionImpl(c, registry, + registry.getClusterId().get(), null, User.getCurrent()); + LOCATOR = new AsyncNonMetaRegionLocator(CONN); + } + private void createSingleRegionTable() throws IOException, InterruptedException { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); @@ -347,8 +392,21 @@ public class TestAsyncNonMetaRegionLocator { getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); } // should get the new location when reload = true - assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, - getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get()); + // when meta replica LoadBalance mode is enabled, it may delay a bit. + TEST_UTIL.waitFor(3000, new ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + HRegionLocation loc = getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, + RegionLocateType.CURRENT, true).get(); + return newServerName.equals(loc.getServerName()); + } + + @Override + public String explainFailure() throws Exception { + return "New location does not show up in meta (replica) region"; + } + }); + // the cached location should be replaced for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java index 694b7a18c92..c68a7f1b1a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -39,6 +41,10 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -63,17 +69,20 @@ import org.slf4j.LoggerFactory; /** * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and * verifying async wal replication replays the edits to the secondary region in various scenarios. + * * @see TestRegionReplicaReplicationEndpoint */ @Category({LargeTests.class}) public class TestMetaRegionReplicaReplicationEndpoint { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class); + HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class); private static final Logger LOG = - LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class); - private static final int NB_SERVERS = 3; - private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class); + private static final int NB_SERVERS = 4; + private final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private int numOfMetaReplica = NB_SERVERS - 1; + private static byte[] VALUE = Bytes.toBytes("value"); @Rule public TestName name = new TestName(); @@ -93,12 +102,17 @@ public class TestMetaRegionReplicaReplicationEndpoint { conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); // Enable hbase:meta replication. - conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, + true); // Set hbase:meta replicas to be 3. - conf.setInt(HConstants.META_REPLICAS_NUM, NB_SERVERS); + // conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica); HTU.startMiniCluster(NB_SERVERS); + // Enable hbase:meta replication. + HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica); + HTU.waitFor(30000, - () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= NB_SERVERS); + () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() + >= numOfMetaReplica); } @After @@ -129,7 +143,7 @@ public class TestMetaRegionReplicaReplicationEndpoint { assertNotNull(hrsOther); assertFalse(isMetaRegionReplicaReplicationSource(hrsOther)); Region meta = null; - for (Region region: hrs.getOnlineRegionsLocalContext()) { + for (Region region : hrs.getOnlineRegionsLocalContext()) { if (region.getRegionInfo().isMetaRegion()) { meta = region; break; @@ -157,8 +171,8 @@ public class TestMetaRegionReplicaReplicationEndpoint { private void testHBaseMetaReplicatesOneRow(int i) throws Exception { waitForMetaReplicasToOnline(); try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i), - HConstants.CATALOG_FAMILY)) { - verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName())); + HConstants.CATALOG_FAMILY)) { + verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName())); } } @@ -176,18 +190,136 @@ public class TestMetaRegionReplicaReplicationEndpoint { */ @Test public void testHBaseMetaReplicates() throws Exception { - try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"), - HConstants.CATALOG_FAMILY, - Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) { - verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName())); + try (Table table = HTU + .createTable(TableName.valueOf(this.name.getMethodName() + "_0"), HConstants.CATALOG_FAMILY, + Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) { + verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName())); } - try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"), - HConstants.CATALOG_FAMILY, - Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) { - verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName())); + try (Table table = HTU + .createTable(TableName.valueOf(this.name.getMethodName() + "_1"), HConstants.CATALOG_FAMILY, + Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) { + verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName())); // Try delete. HTU.deleteTableIfAny(table.getName()); - verifyDeletedReplication(TableName.META_TABLE_NAME, NB_SERVERS, table.getName()); + verifyDeletedReplication(TableName.META_TABLE_NAME, numOfMetaReplica, table.getName()); + } + } + + @Test + public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception { + Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + TableName tableName = TableName.valueOf("hbase:meta"); + Table table = connection.getTable(tableName); + try { + // load the data to the table + for (int i = 0; i < 5; i++) { + LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000)); + HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000); + LOG.info("flushing table"); + HTU.flush(tableName); + LOG.info("compacting table"); + if (i < 4) { + HTU.compact(tableName, false); + } + } + + verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY); + } finally { + table.close(); + connection.close(); + } + } + + @Test + public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception { + MiniHBaseCluster cluster = HTU.getMiniHBaseCluster(); + HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta()); + + HRegionServer hrsMetaReplica = null; + HRegionServer hrsNoMetaReplica = null; + HRegionServer server = null; + Region metaReplica = null; + boolean hostingMeta; + + for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) { + server = cluster.getRegionServer(i); + hostingMeta = false; + if (server == hrs) { + continue; + } + for (Region region : server.getOnlineRegionsLocalContext()) { + if (region.getRegionInfo().isMetaRegion()) { + if (metaReplica == null) { + metaReplica = region; + } + hostingMeta = true; + break; + } + } + if (!hostingMeta) { + hrsNoMetaReplica = server; + } + } + + Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + TableName tableName = TableName.valueOf("hbase:meta"); + Table table = connection.getTable(tableName); + try { + // load the data to the table + for (int i = 0; i < 5; i++) { + LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000)); + HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000); + if (i == 0) { + HTU.moveRegionAndWait(metaReplica.getRegionInfo(), hrsNoMetaReplica.getServerName()); + } + } + + verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY); + } finally { + table.close(); + connection.close(); + } + } + + protected void verifyReplication(TableName tableName, int regionReplication, final int startRow, + final int endRow, final byte[] family) throws Exception { + verifyReplication(tableName, regionReplication, startRow, endRow, family, true); + } + + private void verifyReplication(TableName tableName, int regionReplication, final int startRow, + final int endRow, final byte[] family, final boolean present) throws Exception { + // find the regions + final Region[] regions = new Region[regionReplication]; + + for (int i = 0; i < NB_SERVERS; i++) { + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); + List onlineRegions = rs.getRegions(tableName); + for (HRegion region : onlineRegions) { + regions[region.getRegionInfo().getReplicaId()] = region; + } + } + + for (Region region : regions) { + assertNotNull(region); + } + + for (int i = 1; i < regionReplication; i++) { + final Region region = regions[i]; + // wait until all the data is replicated to all secondary regions + Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + LOG.info("verifying replication for region replica:" + region.getRegionInfo()); + try { + HTU.verifyNumericRows(region, family, startRow, endRow, present); + } catch (Throwable ex) { + LOG.warn("Verification from secondary region is not complete yet", ex); + // still wait + return false; + } + return true; + } + }); } } @@ -201,10 +333,10 @@ public class TestMetaRegionReplicaReplicationEndpoint { // getRegionLocations returns an entry for each replica but if unassigned, entry is null. // Pass reload to force us to skip cache else it just keeps returning default. () -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream(). - filter(Objects::nonNull).count() >= NB_SERVERS); + filter(Objects::nonNull).count() >= numOfMetaReplica); List locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW); LOG.info("Found locations {}", locations); - assertEquals(NB_SERVERS, locations.size()); + assertEquals(numOfMetaReplica, locations.size()); } /** @@ -213,7 +345,8 @@ public class TestMetaRegionReplicaReplicationEndpoint { private List getMetaCells(TableName tableName) throws IOException { final List results = new ArrayList<>(); ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { - @Override public boolean visit(Result r) throws IOException { + @Override + public boolean visit(Result r) throws IOException { results.add(r); return true; } @@ -225,7 +358,7 @@ public class TestMetaRegionReplicaReplicationEndpoint { /** * @return All Regions for tableName including Replicas. */ - private Region [] getAllRegions(TableName tableName, int replication) { + private Region[] getAllRegions(TableName tableName, int replication) { final Region[] regions = new Region[replication]; for (int i = 0; i < NB_SERVERS; i++) { HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); @@ -240,12 +373,23 @@ public class TestMetaRegionReplicaReplicationEndpoint { return regions; } + private Region getOneRegion(TableName tableName) { + for (int i = 0; i < NB_SERVERS; i++) { + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); + List onlineRegions = rs.getRegions(tableName); + if (onlineRegions.size() > 1) { + return onlineRegions.get(0); + } + } + return null; + } + /** * Verify when a Table is deleted from primary, then there are no references in replicas * (because they get the delete of the table rows too). */ private void verifyDeletedReplication(TableName tableName, int regionReplication, - final TableName deletedTableName) { + final TableName deletedTableName) { final Region[] regions = getAllRegions(tableName, regionReplication); // Start count at '1' so we skip default, primary replica and only look at secondaries. @@ -262,7 +406,7 @@ public class TestMetaRegionReplicaReplicationEndpoint { continue; } return doesNotContain(cells, deletedTableName); - } catch(Throwable ex) { + } catch (Throwable ex) { LOG.warn("Verification from secondary region is not complete yet", ex); // still wait return false; @@ -278,7 +422,7 @@ public class TestMetaRegionReplicaReplicationEndpoint { * cells. */ private boolean doesNotContain(List cells, TableName tableName) { - for (Cell cell: cells) { + for (Cell cell : cells) { String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) { return false; @@ -291,7 +435,7 @@ public class TestMetaRegionReplicaReplicationEndpoint { * Verify Replicas have results (exactly). */ private void verifyReplication(TableName tableName, int regionReplication, - List contains) { + List contains) { final Region[] regions = getAllRegions(tableName, regionReplication); // Start count at '1' so we skip default, primary replica and only look at secondaries. @@ -308,7 +452,7 @@ public class TestMetaRegionReplicaReplicationEndpoint { continue; } return contains(contains, cells); - } catch(Throwable ex) { + } catch (Throwable ex) { LOG.warn("Verification from secondary region is not complete yet", ex); // still wait return false; @@ -338,4 +482,134 @@ public class TestMetaRegionReplicaReplicationEndpoint { } return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size(); } + + private void doNGets(final Table table, final byte[][] keys) throws Exception { + for (byte[] key : keys) { + Result r = table.get(new Get(key)); + assertArrayEquals(VALUE, r.getValue(HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY)); + } + } + + private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) { + assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID], + after[RegionInfo.DEFAULT_REPLICA_ID]); + + for (int i = 1; i < after.length; i ++) { + assertTrue(after[i] > before[i]); + } + } + + private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) { + // There are read requests increase for primary meta replica. + assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > + before[RegionInfo.DEFAULT_REPLICA_ID]); + + // No change for replica regions + for (int i = 1; i < after.length; i ++) { + assertEquals(before[i], after[i]); + } + } + + private void getMetaReplicaReadRequests(final Region[] metaRegions, final long[] counters) { + int i = 0; + for (Region r : metaRegions) { + LOG.info("read request for region {} is {}", r, r.getReadRequestsCount()); + counters[i] = r.getReadRequestsCount(); + i ++; + } + } + + @Test + public void testHBaseMetaReplicaGets() throws Exception { + + TableName tn = TableName.valueOf(this.name.getMethodName()); + final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica); + long[] readReqsForMetaReplicas = new long[numOfMetaReplica]; + long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica]; + long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica]; + long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica]; + long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica]; + Region userRegion = null; + HRegionServer srcRs = null; + HRegionServer destRs = null; + + try (Table table = HTU.createTable(tn, HConstants.CATALOG_FAMILY, + Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) { + verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName())); + // load different values + HTU.loadTable(table, new byte[][] { HConstants.CATALOG_FAMILY }, VALUE); + for (int i = 0; i < NB_SERVERS; i++) { + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); + List onlineRegions = rs.getRegions(tn); + if (onlineRegions.size() > 0) { + userRegion = onlineRegions.get(0); + srcRs = rs; + if (i > 0) { + destRs = HTU.getMiniHBaseCluster().getRegionServer(0); + } else { + destRs = HTU.getMiniHBaseCluster().getRegionServer(1); + } + } + } + + getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicas); + + Configuration c = new Configuration(HTU.getConfiguration()); + c.setBoolean(HConstants.USE_META_REPLICAS, true); + c.set(LOCATOR_META_REPLICAS_MODE, "LoadBalance"); + Connection connection = ConnectionFactory.createConnection(c); + Table tableForGet = connection.getTable(tn); + byte[][] getRows = new byte[HBaseTestingUtility.KEYS.length][]; + + int i = 0; + for (byte[] key : HBaseTestingUtility.KEYS) { + getRows[i] = key; + i++; + } + getRows[0] = Bytes.toBytes("aaa"); + doNGets(tableForGet, getRows); + + getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGet); + + // There is no read requests increase for primary meta replica. + // For rest of meta replicas, there are more reads against them. + primaryNoChangeReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet); + + // move one of regions so it meta cache may be invalid. + HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName()); + + doNGets(tableForGet, getRows); + + getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterMove); + + // There are read requests increase for primary meta replica. + // For rest of meta replicas, there is no change as regionMove will tell the new location + primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGet, + readReqsForMetaReplicasAfterMove); + // Move region again. + HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName()); + + // Wait until moveRegion cache timeout. + while (destRs.getMovedRegion(userRegion.getRegionInfo().getEncodedName()) != null) { + Thread.sleep(1000); + } + + getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterSecondMove); + + // There are read requests increase for primary meta replica. + // For rest of meta replicas, there is no change. + primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterMove, + readReqsForMetaReplicasAfterSecondMove); + + doNGets(tableForGet, getRows); + + getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterThirdGet); + + // Since it gets RegionNotServedException, it will go to primary for the next lookup. + // There are read requests increase for primary meta replica. + // For rest of meta replicas, there is no change. + primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterSecondMove, + readReqsForMetaReplicasAfterThirdGet); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 99f0ac6670f..796c0e3b18c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -533,7 +533,6 @@ public class TestReplicationSource { try { rs.startup(); assertTrue(rs.isSourceActive()); - Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0); Waiter.waitFor(conf, 1000, () -> rss.isAborted()); assertTrue(rss.isAborted()); Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());