HBASE-26649 Support meta replica LoadBalance mode for RegionLocator#getAllRegionLocations() (#4044)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
056e8fb88b
commit
8c607397c1
|
@ -13,7 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
* </ol>
|
* </ol>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
enum CatalogReplicaMode {
|
public enum CatalogReplicaMode {
|
||||||
NONE {
|
NONE {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
|
||||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -26,6 +27,7 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
|
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
|
||||||
import org.apache.hadoop.hbase.client.AsyncTable;
|
import org.apache.hadoop.hbase.client.AsyncTable;
|
||||||
|
@ -416,10 +418,40 @@ public final class ClientMetaTableAccessor {
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
|
int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
|
||||||
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
|
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
|
||||||
if (metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS,
|
// Get the region locator's meta replica mode.
|
||||||
HConstants.DEFAULT_USE_META_REPLICAS)) {
|
CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(metaTable.getConfiguration()
|
||||||
scan.setConsistency(Consistency.TIMELINE);
|
.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
|
||||||
|
|
||||||
|
switch (metaReplicaMode) {
|
||||||
|
case LOAD_BALANCE:
|
||||||
|
int numOfReplicas = 1;
|
||||||
|
try {
|
||||||
|
numOfReplicas = metaTable.getDescriptor().get().getRegionReplication();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Failed to get region replication for meta table");
|
||||||
|
}
|
||||||
|
if (numOfReplicas > 1) {
|
||||||
|
int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas);
|
||||||
|
|
||||||
|
// When the replicaId is 0, do not set to Consistency.TIMELINE
|
||||||
|
if (replicaId > 0) {
|
||||||
|
scan.setReplicaId(replicaId);
|
||||||
|
scan.setConsistency(Consistency.TIMELINE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case NONE:
|
||||||
|
// If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
|
||||||
|
if (metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS,
|
||||||
|
HConstants.DEFAULT_USE_META_REPLICAS)) {
|
||||||
|
scan.setConsistency(Consistency.TIMELINE);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
// Do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rowUpperLimit <= scannerCaching) {
|
if (rowUpperLimit <= scannerCaching) {
|
||||||
scan.setLimit(rowUpperLimit);
|
scan.setLimit(rowUpperLimit);
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,6 +50,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.commons.lang3.ObjectUtils;
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
import org.apache.hadoop.hbase.CatalogFamilyFormat;
|
import org.apache.hadoop.hbase.CatalogFamilyFormat;
|
||||||
|
import org.apache.hadoop.hbase.CatalogReplicaMode;
|
||||||
import org.apache.hadoop.hbase.HBaseIOException;
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
|
|
@ -36,6 +36,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CatalogReplicaMode;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
@ -385,6 +386,12 @@ public class TestMetaRegionReplicaReplication {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void PrimaryIncreaseReplicaIncrease(final long[] before, final long[] after) {
|
||||||
|
for (int i = 0; i < after.length; i++) {
|
||||||
|
assertTrue(after[i] > before[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
|
private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
|
||||||
// There are read requests increase for primary meta replica.
|
// There are read requests increase for primary meta replica.
|
||||||
assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
|
assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
|
||||||
|
@ -410,6 +417,7 @@ public class TestMetaRegionReplicaReplication {
|
||||||
final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
|
final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
|
||||||
long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
|
long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
|
||||||
long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
|
long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
|
||||||
|
long[] readReqsForMetaReplicasAfterGetAllLocations = new long[numOfMetaReplica];
|
||||||
long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
|
long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
|
||||||
long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
|
long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
|
||||||
long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
|
long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
|
||||||
|
@ -459,6 +467,16 @@ public class TestMetaRegionReplicaReplication {
|
||||||
// For rest of meta replicas, there are more reads against them.
|
// For rest of meta replicas, there are more reads against them.
|
||||||
primaryNoChangeReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);
|
primaryNoChangeReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);
|
||||||
|
|
||||||
|
RegionLocator locator = tableForGet.getRegionLocator();
|
||||||
|
|
||||||
|
for (int j = 0; j < numOfMetaReplica * 3; j ++) {
|
||||||
|
locator.getAllRegionLocations();
|
||||||
|
}
|
||||||
|
|
||||||
|
getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGetAllLocations);
|
||||||
|
PrimaryIncreaseReplicaIncrease(readReqsForMetaReplicasAfterGet,
|
||||||
|
readReqsForMetaReplicasAfterGetAllLocations);
|
||||||
|
|
||||||
// move one of regions so it meta cache may be invalid.
|
// move one of regions so it meta cache may be invalid.
|
||||||
HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());
|
HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());
|
||||||
|
|
||||||
|
@ -468,7 +486,7 @@ public class TestMetaRegionReplicaReplication {
|
||||||
|
|
||||||
// There are read requests increase for primary meta replica.
|
// There are read requests increase for primary meta replica.
|
||||||
// For rest of meta replicas, there is no change as regionMove will tell the new location
|
// For rest of meta replicas, there is no change as regionMove will tell the new location
|
||||||
primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGet,
|
primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGetAllLocations,
|
||||||
readReqsForMetaReplicasAfterMove);
|
readReqsForMetaReplicasAfterMove);
|
||||||
// Move region again.
|
// Move region again.
|
||||||
HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
|
HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
|
||||||
|
|
Loading…
Reference in New Issue