HBASE-20741 Split of a region with replicas creates all daughter regions
and its replica in same server (Ram) Signed-off-by: Huaxiang Sun, Michael Stack
This commit is contained in:
parent
e4c4035ed8
commit
83131b1ac4
|
@ -663,16 +663,22 @@ public class AssignmentManager implements ServerListener {
|
||||||
* scheme). If at assign-time, the target chosen is no longer up, thats fine, the
|
* scheme). If at assign-time, the target chosen is no longer up, thats fine, the
|
||||||
* AssignProcedure will ask the balancer for a new target, and so on.
|
* AssignProcedure will ask the balancer for a new target, and so on.
|
||||||
*/
|
*/
|
||||||
public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(
|
public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
|
||||||
List<RegionInfo> hris) {
|
List<ServerName> serversToExclude) {
|
||||||
if (hris.isEmpty()) {
|
if (hris.isEmpty()) {
|
||||||
return new TransitRegionStateProcedure[0];
|
return new TransitRegionStateProcedure[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (serversToExclude != null
|
||||||
|
&& this.master.getServerManager().getOnlineServersList().size() == 1) {
|
||||||
|
LOG.debug("Only one region server found and hence going ahead with the assignment");
|
||||||
|
serversToExclude = null;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
// Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
|
// Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
|
||||||
// a better job if it has all the assignments in the one lump.
|
// a better job if it has all the assignments in the one lump.
|
||||||
Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
|
Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
|
||||||
this.master.getServerManager().createDestinationServersList(null));
|
this.master.getServerManager().createDestinationServersList(serversToExclude));
|
||||||
// Return mid-method!
|
// Return mid-method!
|
||||||
return createAssignProcedures(assignments);
|
return createAssignProcedures(assignments);
|
||||||
} catch (HBaseIOException hioe) {
|
} catch (HBaseIOException hioe) {
|
||||||
|
@ -682,6 +688,17 @@ public class AssignmentManager implements ServerListener {
|
||||||
return createAssignProcedures(hris);
|
return createAssignProcedures(hris);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create round-robin assigns. Use on table creation to distribute out regions across cluster.
|
||||||
|
* @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
|
||||||
|
* to populate the assigns with targets chosen using round-robin (default balancer
|
||||||
|
* scheme). If at assign-time, the target chosen is no longer up, thats fine, the
|
||||||
|
* AssignProcedure will ask the balancer for a new target, and so on.
|
||||||
|
*/
|
||||||
|
public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
|
||||||
|
return createRoundRobinAssignProcedures(hris, null);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
|
static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
|
||||||
if (left.getRegion().isMetaRegion()) {
|
if (left.getRegion().isMetaRegion()) {
|
||||||
|
|
|
@ -18,12 +18,15 @@
|
||||||
package org.apache.hadoop.hbase.master.assignment;
|
package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ListIterator;
|
import java.util.ListIterator;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.ArrayUtils;
|
||||||
import org.apache.hadoop.hbase.HBaseIOException;
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
@ -136,38 +139,58 @@ final class AssignmentManagerUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env,
|
private static TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env,
|
||||||
Stream<RegionInfo> regions, int regionReplication, ServerName targetServer) {
|
List<RegionInfo> regions, int regionReplication, ServerName targetServer) {
|
||||||
return regions
|
// create the assign procs only for the primary region using the targetServer
|
||||||
.flatMap(hri -> IntStream.range(0, regionReplication)
|
TransitRegionStateProcedure[] primaryRegionProcs = regions.stream()
|
||||||
.mapToObj(i -> RegionReplicaUtil.getRegionInfoForReplica(hri, i)))
|
.flatMap(hri -> IntStream.range(0, 1) // yes, only the primary
|
||||||
.map(env.getAssignmentManager().getRegionStates()::getOrCreateRegionStateNode)
|
.mapToObj(i -> RegionReplicaUtil.getRegionInfoForReplica(hri, i)))
|
||||||
.map(regionNode -> {
|
.map(env.getAssignmentManager().getRegionStates()::getOrCreateRegionStateNode)
|
||||||
TransitRegionStateProcedure proc =
|
.map(regionNode -> {
|
||||||
TransitRegionStateProcedure.assign(env, regionNode.getRegionInfo(), targetServer);
|
TransitRegionStateProcedure proc =
|
||||||
regionNode.lock();
|
TransitRegionStateProcedure.assign(env, regionNode.getRegionInfo(), targetServer);
|
||||||
try {
|
regionNode.lock();
|
||||||
// should never fail, as we have the exclusive region lock, and the region is newly
|
try {
|
||||||
// created, or has been successfully closed so should not be on any servers, so SCP will
|
// should never fail, as we have the exclusive region lock, and the region is newly
|
||||||
// not process it either.
|
// created, or has been successfully closed so should not be on any servers, so SCP will
|
||||||
assert !regionNode.isInTransition();
|
// not process it either.
|
||||||
regionNode.setProcedure(proc);
|
assert !regionNode.isInTransition();
|
||||||
} finally {
|
regionNode.setProcedure(proc);
|
||||||
regionNode.unlock();
|
} finally {
|
||||||
}
|
regionNode.unlock();
|
||||||
return proc;
|
}
|
||||||
}).toArray(TransitRegionStateProcedure[]::new);
|
return proc;
|
||||||
|
}).toArray(TransitRegionStateProcedure[]::new);
|
||||||
|
if (regionReplication == 1) {
|
||||||
|
// this is the default case
|
||||||
|
return primaryRegionProcs;
|
||||||
|
}
|
||||||
|
// collect the replica region infos
|
||||||
|
List<RegionInfo> replicaRegionInfos =
|
||||||
|
new ArrayList<RegionInfo>(regions.size() * (regionReplication - 1));
|
||||||
|
for (RegionInfo hri : regions) {
|
||||||
|
// start the index from 1
|
||||||
|
for (int i = 1; i < regionReplication; i++) {
|
||||||
|
replicaRegionInfos.add(RegionReplicaUtil.getRegionInfoForReplica(hri, i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// create round robin procs. Note that we exclude the primary region's target server
|
||||||
|
TransitRegionStateProcedure[] replicaRegionAssignProcs =
|
||||||
|
env.getAssignmentManager().createRoundRobinAssignProcedures(replicaRegionInfos,
|
||||||
|
Collections.singletonList(targetServer));
|
||||||
|
// combine both the procs and return the result
|
||||||
|
return ArrayUtils.addAll(primaryRegionProcs, replicaRegionAssignProcs);
|
||||||
}
|
}
|
||||||
|
|
||||||
static TransitRegionStateProcedure[] createAssignProceduresForOpeningNewRegions(
|
static TransitRegionStateProcedure[] createAssignProceduresForOpeningNewRegions(
|
||||||
MasterProcedureEnv env, Stream<RegionInfo> regions, int regionReplication,
|
MasterProcedureEnv env, List<RegionInfo> regions, int regionReplication,
|
||||||
ServerName targetServer) {
|
ServerName targetServer) {
|
||||||
return createAssignProcedures(env, regions, regionReplication, targetServer);
|
return createAssignProcedures(env, regions, regionReplication, targetServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void reopenRegionsForRollback(MasterProcedureEnv env, Stream<RegionInfo> regions,
|
static void reopenRegionsForRollback(MasterProcedureEnv env, List<RegionInfo> regions,
|
||||||
int regionReplication, ServerName targetServer) {
|
int regionReplication, ServerName targetServer) {
|
||||||
TransitRegionStateProcedure[] procs =
|
TransitRegionStateProcedure[] procs =
|
||||||
createAssignProcedures(env, regions, regionReplication, targetServer);
|
createAssignProcedures(env, regions, regionReplication, targetServer);
|
||||||
env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
|
env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -663,7 +664,7 @@ public class MergeTableRegionsProcedure
|
||||||
* Rollback close regions
|
* Rollback close regions
|
||||||
**/
|
**/
|
||||||
private void rollbackCloseRegionsForMerge(MasterProcedureEnv env) throws IOException {
|
private void rollbackCloseRegionsForMerge(MasterProcedureEnv env) throws IOException {
|
||||||
AssignmentManagerUtil.reopenRegionsForRollback(env, Stream.of(regionsToMerge),
|
AssignmentManagerUtil.reopenRegionsForRollback(env, Arrays.asList(regionsToMerge),
|
||||||
getRegionReplication(env), getServerName(env));
|
getRegionReplication(env), getServerName(env));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -676,7 +677,7 @@ public class MergeTableRegionsProcedure
|
||||||
private TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env)
|
private TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return AssignmentManagerUtil.createAssignProceduresForOpeningNewRegions(env,
|
return AssignmentManagerUtil.createAssignProceduresForOpeningNewRegions(env,
|
||||||
Stream.of(mergedRegion), getRegionReplication(env), getServerName(env));
|
Collections.singletonList(mergedRegion), getRegionReplication(env), getServerName(env));
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getRegionReplication(final MasterProcedureEnv env) throws IOException {
|
private int getRegionReplication(final MasterProcedureEnv env) throws IOException {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -539,8 +540,9 @@ public class SplitTableRegionProcedure
|
||||||
* Rollback close parent region
|
* Rollback close parent region
|
||||||
*/
|
*/
|
||||||
private void openParentRegion(MasterProcedureEnv env) throws IOException {
|
private void openParentRegion(MasterProcedureEnv env) throws IOException {
|
||||||
AssignmentManagerUtil.reopenRegionsForRollback(env, Stream.of(getParentRegion()),
|
AssignmentManagerUtil.reopenRegionsForRollback(env,
|
||||||
getRegionReplication(env), getParentRegionServerName(env));
|
Collections.singletonList((getParentRegion())), getRegionReplication(env),
|
||||||
|
getParentRegionServerName(env));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -813,9 +815,11 @@ public class SplitTableRegionProcedure
|
||||||
|
|
||||||
private TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env)
|
private TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return AssignmentManagerUtil.createAssignProceduresForOpeningNewRegions(env,
|
List<RegionInfo> hris = new ArrayList<RegionInfo>(2);
|
||||||
Stream.of(daughter_1_RI, daughter_2_RI), getRegionReplication(env),
|
hris.add(daughter_1_RI);
|
||||||
getParentRegionServerName(env));
|
hris.add(daughter_2_RI);
|
||||||
|
return AssignmentManagerUtil.createAssignProceduresForOpeningNewRegions(env, hris,
|
||||||
|
getRegionReplication(env), getParentRegionServerName(env));
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getRegionReplication(final MasterProcedureEnv env) throws IOException {
|
private int getRegionReplication(final MasterProcedureEnv env) throws IOException {
|
||||||
|
|
|
@ -1271,13 +1271,30 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
List<RegionInfo> lastFewRegions = new ArrayList<>();
|
List<RegionInfo> lastFewRegions = new ArrayList<>();
|
||||||
// assign the remaining by going through the list and try to assign to servers one-by-one
|
// assign the remaining by going through the list and try to assign to servers one-by-one
|
||||||
int serverIdx = RANDOM.nextInt(numServers);
|
int serverIdx = RANDOM.nextInt(numServers);
|
||||||
for (RegionInfo region : unassignedRegions) {
|
OUTER : for (RegionInfo region : unassignedRegions) {
|
||||||
boolean assigned = false;
|
boolean assigned = false;
|
||||||
for (int j = 0; j < numServers; j++) { // try all servers one by one
|
INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one
|
||||||
ServerName serverName = servers.get((j + serverIdx) % numServers);
|
ServerName serverName = servers.get((j + serverIdx) % numServers);
|
||||||
if (!cluster.wouldLowerAvailability(region, serverName)) {
|
if (!cluster.wouldLowerAvailability(region, serverName)) {
|
||||||
List<RegionInfo> serverRegions =
|
List<RegionInfo> serverRegions =
|
||||||
assignments.computeIfAbsent(serverName, k -> new ArrayList<>());
|
assignments.computeIfAbsent(serverName, k -> new ArrayList<>());
|
||||||
|
if (!RegionReplicaUtil.isDefaultReplica(region.getReplicaId())) {
|
||||||
|
// if the region is not a default replica
|
||||||
|
// check if the assignments map has the other replica region on this server
|
||||||
|
for (RegionInfo hri : serverRegions) {
|
||||||
|
if (RegionReplicaUtil.isReplicasForSameRegion(region, hri)) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Skipping the server, " + serverName
|
||||||
|
+ " , got the same server for the region " + region);
|
||||||
|
}
|
||||||
|
// do not allow this case. The unassignedRegions we got because the
|
||||||
|
// replica region in this list was not assigned because of lower availablity issue.
|
||||||
|
// So when we assign here we should ensure that as far as possible the server being
|
||||||
|
// selected does not have the server where the replica region was not assigned.
|
||||||
|
continue INNER; // continue the inner loop, ie go to the next server
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
serverRegions.add(region);
|
serverRegions.add(region);
|
||||||
cluster.doAssignRegion(region, serverName);
|
cluster.doAssignRegion(region, serverName);
|
||||||
serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
|
serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
|
||||||
|
|
Loading…
Reference in New Issue