HBASE-26649 Support meta replica LoadBalance mode for RegionLocator#getAllRegionLocations() (#4442) (#4464)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
f97af48d30
commit
672b0a5f5c
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
|
||||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -29,6 +30,7 @@ import java.util.NavigableMap;
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -295,7 +297,37 @@ public class AsyncMetaTableAccessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
CompletableFuture<Void> future = new CompletableFuture<Void>();
|
CompletableFuture<Void> future = new CompletableFuture<Void>();
|
||||||
|
// Get the region locator's meta replica mode.
|
||||||
|
CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(metaTable.getConfiguration()
|
||||||
|
.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
|
||||||
|
|
||||||
|
if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
|
||||||
|
addListener(metaTable.getDescriptor(), (desc, error) -> {
|
||||||
|
if (error != null) {
|
||||||
|
LOG.error("Failed to get meta table descriptor, error: ", error);
|
||||||
|
future.completeExceptionally(error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int numOfReplicas = desc.getRegionReplication();
|
||||||
|
if (numOfReplicas > 1) {
|
||||||
|
int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas);
|
||||||
|
|
||||||
|
// When the replicaId is 0, do not set to Consistency.TIMELINE
|
||||||
|
if (replicaId > 0) {
|
||||||
|
scan.setReplicaId(replicaId);
|
||||||
|
scan.setConsistency(Consistency.TIMELINE);
|
||||||
|
}
|
||||||
|
}
|
||||||
metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
|
metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
if (metaReplicaMode == CatalogReplicaMode.HEDGED_READ) {
|
||||||
|
scan.setConsistency(Consistency.TIMELINE);
|
||||||
|
}
|
||||||
|
metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
|
||||||
|
}
|
||||||
|
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
@ -33,7 +33,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||||||
* </ol>
|
* </ol>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
enum CatalogReplicaMode {
|
public enum CatalogReplicaMode {
|
||||||
NONE {
|
NONE {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
@ -33,6 +33,7 @@ import java.util.Map;
|
|||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -697,15 +698,21 @@ public class MetaTableAccessor {
|
|||||||
scanMeta(connection, null, null, QueryType.ALL, v);
|
scanMeta(connection, null, null, QueryType.ALL, v);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void scanMetaForTableRegions(Connection connection, Visitor visitor,
|
||||||
|
TableName tableName, CatalogReplicaMode metaReplicaMode) throws IOException {
|
||||||
|
scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor, metaReplicaMode);
|
||||||
|
}
|
||||||
|
|
||||||
public static void scanMetaForTableRegions(Connection connection, Visitor visitor,
|
public static void scanMetaForTableRegions(Connection connection, Visitor visitor,
|
||||||
TableName tableName) throws IOException {
|
TableName tableName) throws IOException {
|
||||||
scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor);
|
scanMetaForTableRegions(connection, visitor, tableName, CatalogReplicaMode.NONE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows,
|
private static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows,
|
||||||
final Visitor visitor) throws IOException {
|
final Visitor visitor, CatalogReplicaMode metaReplicaMode) throws IOException {
|
||||||
scanMeta(connection, getTableStartRowForMeta(table, type), getTableStopRowForMeta(table, type),
|
scanMeta(connection, getTableStartRowForMeta(table, type), getTableStopRowForMeta(table, type),
|
||||||
type, maxRows, visitor);
|
type, null, maxRows, visitor, metaReplicaMode);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
|
private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
|
||||||
@ -749,12 +756,12 @@ public class MetaTableAccessor {
|
|||||||
static void scanMeta(Connection connection, @Nullable final byte[] startRow,
|
static void scanMeta(Connection connection, @Nullable final byte[] startRow,
|
||||||
@Nullable final byte[] stopRow, QueryType type, int maxRows, final Visitor visitor)
|
@Nullable final byte[] stopRow, QueryType type, int maxRows, final Visitor visitor)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor);
|
scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor, CatalogReplicaMode.NONE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
|
private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
|
||||||
@Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
|
@Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
|
||||||
final Visitor visitor) throws IOException {
|
final Visitor visitor, CatalogReplicaMode metaReplicaMode) throws IOException {
|
||||||
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
|
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
|
||||||
Scan scan = getMetaScan(connection.getConfiguration(), rowUpperLimit);
|
Scan scan = getMetaScan(connection.getConfiguration(), rowUpperLimit);
|
||||||
|
|
||||||
@ -779,6 +786,25 @@ public class MetaTableAccessor {
|
|||||||
|
|
||||||
int currentRow = 0;
|
int currentRow = 0;
|
||||||
try (Table metaTable = getMetaHTable(connection)) {
|
try (Table metaTable = getMetaHTable(connection)) {
|
||||||
|
switch (metaReplicaMode) {
|
||||||
|
case LOAD_BALANCE:
|
||||||
|
int numOfReplicas = metaTable.getDescriptor().getRegionReplication();
|
||||||
|
if (numOfReplicas > 1) {
|
||||||
|
int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas);
|
||||||
|
|
||||||
|
// When the replicaId is 0, do not set to Consistency.TIMELINE
|
||||||
|
if (replicaId > 0) {
|
||||||
|
scan.setReplicaId(replicaId);
|
||||||
|
scan.setConsistency(Consistency.TIMELINE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case HEDGED_READ:
|
||||||
|
scan.setConsistency(Consistency.TIMELINE);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
try (ResultScanner scanner = metaTable.getScanner(scan)) {
|
try (ResultScanner scanner = metaTable.getScanner(scan)) {
|
||||||
Result data;
|
Result data;
|
||||||
while ((data = scanner.next()) != null) {
|
while ((data = scanner.next()) != null) {
|
||||||
@ -2056,7 +2082,7 @@ public class MetaTableAccessor {
|
|||||||
new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> {
|
new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> {
|
||||||
list.add(RegionInfo.encodeRegionName(r.getRow()));
|
list.add(RegionInfo.encodeRegionName(r.getRow()));
|
||||||
return true;
|
return true;
|
||||||
});
|
}, CatalogReplicaMode.NONE);
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
|
|||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.commons.lang3.ObjectUtils;
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
|
import org.apache.hadoop.hbase.CatalogReplicaMode;
|
||||||
import org.apache.hadoop.hbase.HBaseIOException;
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
@ -54,6 +54,7 @@ import java.util.function.Supplier;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.AuthUtil;
|
import org.apache.hadoop.hbase.AuthUtil;
|
||||||
|
import org.apache.hadoop.hbase.CatalogReplicaMode;
|
||||||
import org.apache.hadoop.hbase.ChoreService;
|
import org.apache.hadoop.hbase.ChoreService;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseServerException;
|
import org.apache.hadoop.hbase.HBaseServerException;
|
||||||
|
@ -31,6 +31,7 @@ import java.util.Objects;
|
|||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import org.apache.hadoop.hbase.CatalogReplicaMode;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
@ -143,7 +144,9 @@ public class HRegionLocator implements RegionLocator {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
MetaTableAccessor.scanMetaForTableRegions(connection, visitor, tableName);
|
CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(connection.getConfiguration()
|
||||||
|
.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
|
||||||
|
MetaTableAccessor.scanMetaForTableRegions(connection, visitor, tableName, metaReplicaMode);
|
||||||
return regions;
|
return regions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@ import java.util.concurrent.ExecutionException;
|
|||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CatalogReplicaMode;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
@ -516,11 +516,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void primaryIncreaseReplicaIncrease(final long[] before, final long[] after) {
|
private void primaryIncreaseReplicaIncrease(final long[] before, final long[] after) {
|
||||||
// There are read requests increase for primary meta replica.
|
// There are read requests increase for all meta replica regions,
|
||||||
assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
|
for (int i = 0; i < after.length; i++) {
|
||||||
|
|
||||||
// There are read requests incrase for meta replica regions.
|
|
||||||
for (int i = 1; i < after.length; i++) {
|
|
||||||
assertTrue(after[i] > before[i]);
|
assertTrue(after[i] > before[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -541,6 +538,7 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||||||
final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
|
final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
|
||||||
long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
|
long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
|
||||||
long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
|
long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
|
||||||
|
long[] readReqsForMetaReplicasAfterGetAllLocations = new long[numOfMetaReplica];
|
||||||
long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
|
long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
|
||||||
long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
|
long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
|
||||||
long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
|
long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
|
||||||
@ -588,6 +586,16 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||||||
// There are more reads against all meta replica regions, including the primary region.
|
// There are more reads against all meta replica regions, including the primary region.
|
||||||
primaryIncreaseReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);
|
primaryIncreaseReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);
|
||||||
|
|
||||||
|
RegionLocator locator = tableForGet.getRegionLocator();
|
||||||
|
|
||||||
|
for (int j = 0; j < numOfMetaReplica * 3; j++) {
|
||||||
|
locator.getAllRegionLocations();
|
||||||
|
}
|
||||||
|
|
||||||
|
getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGetAllLocations);
|
||||||
|
primaryIncreaseReplicaIncrease(readReqsForMetaReplicasAfterGet,
|
||||||
|
readReqsForMetaReplicasAfterGetAllLocations);
|
||||||
|
|
||||||
// move one of regions so it meta cache may be invalid.
|
// move one of regions so it meta cache may be invalid.
|
||||||
HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());
|
HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());
|
||||||
|
|
||||||
@ -597,7 +605,7 @@ public class TestMetaRegionReplicaReplicationEndpoint {
|
|||||||
|
|
||||||
// There are read requests increase for primary meta replica.
|
// There are read requests increase for primary meta replica.
|
||||||
// For rest of meta replicas, there is no change as regionMove will tell the new location
|
// For rest of meta replicas, there is no change as regionMove will tell the new location
|
||||||
primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGet,
|
primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGetAllLocations,
|
||||||
readReqsForMetaReplicasAfterMove);
|
readReqsForMetaReplicasAfterMove);
|
||||||
// Move region again.
|
// Move region again.
|
||||||
HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
|
HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user