HBASE-10137 GeneralBulkAssigner with retain assignment plan can be used in EnableTableHandler to bulk assign the regions
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1551797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4fb0da7f84
commit
f97e2d8d19
|
@ -3490,4 +3490,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// remove the region plan as well just in case.
|
||||
clearRegionPlan(regionInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Instance of load balancer
|
||||
*/
|
||||
public LoadBalancer getBalancer() {
|
||||
return this.balancer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ public class GeneralBulkAssigner extends BulkAssigner {
|
|||
final AssignmentManager assignmentManager;
|
||||
final boolean waitTillAllAssigned;
|
||||
|
||||
GeneralBulkAssigner(final Server server,
|
||||
public GeneralBulkAssigner(final Server server,
|
||||
final Map<ServerName, List<HRegionInfo>> bulkPlan,
|
||||
final AssignmentManager am, final boolean waitTillAllAssigned) {
|
||||
super(server);
|
||||
|
|
|
@ -559,7 +559,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
} else {
|
||||
// multiple new servers in the cluster on this same host
|
||||
int size = localServers.size();
|
||||
ServerName target = localServers.get(RANDOM.nextInt(size));
|
||||
ServerName target =
|
||||
localServers.contains(oldServerName) ? oldServerName : localServers.get(RANDOM
|
||||
.nextInt(size));
|
||||
assignments.get(target).add(region);
|
||||
numRetainedAssigments++;
|
||||
}
|
||||
|
|
|
@ -19,8 +19,9 @@
|
|||
package org.apache.hadoop.hbase.master.handler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -38,9 +39,9 @@ import org.apache.hadoop.hbase.executor.EventHandler;
|
|||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.BulkAssigner;
|
||||
import org.apache.hadoop.hbase.master.GeneralBulkAssigner;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.RegionStates;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
|
@ -173,22 +174,29 @@ public class EnableTableHandler extends EventHandler {
|
|||
// Set table enabling flag up in zk.
|
||||
this.assignmentManager.getZKTable().setEnablingTable(this.tableName);
|
||||
boolean done = false;
|
||||
ServerManager serverManager = ((HMaster)this.server).getServerManager();
|
||||
// Get the regions of this table. We're done when all listed
|
||||
// tables are onlined.
|
||||
List<Pair<HRegionInfo, ServerName>> tableRegionsAndLocations = MetaReader
|
||||
.getTableRegionsAndLocations(this.catalogTracker, tableName, true);
|
||||
int countOfRegionsInTable = tableRegionsAndLocations.size();
|
||||
List<HRegionInfo> regions = regionsToAssignWithServerName(tableRegionsAndLocations);
|
||||
int regionsCount = regions.size();
|
||||
Map<HRegionInfo, ServerName> regionsToAssign =
|
||||
regionsToAssignWithServerName(tableRegionsAndLocations);
|
||||
int regionsCount = regionsToAssign.size();
|
||||
if (regionsCount == 0) {
|
||||
done = true;
|
||||
}
|
||||
LOG.info("Table '" + this.tableName + "' has " + countOfRegionsInTable
|
||||
+ " regions, of which " + regionsCount + " are offline.");
|
||||
BulkEnabler bd = new BulkEnabler(this.server, regions, countOfRegionsInTable,
|
||||
true);
|
||||
List<ServerName> onlineServers = serverManager.createDestinationServersList();
|
||||
Map<ServerName, List<HRegionInfo>> bulkPlan =
|
||||
this.assignmentManager.getBalancer().retainAssignment(regionsToAssign, onlineServers);
|
||||
LOG.info("Bulk assigning " + regionsCount + " region(s) across " + bulkPlan.size()
|
||||
+ " server(s), retainAssignment=true");
|
||||
|
||||
BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this.assignmentManager, true);
|
||||
try {
|
||||
if (bd.bulkAssign()) {
|
||||
if (ba.bulkAssign()) {
|
||||
done = true;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -214,19 +222,16 @@ public class EnableTableHandler extends EventHandler {
|
|||
* @return List of regions neither in transition nor assigned.
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<HRegionInfo> regionsToAssignWithServerName(
|
||||
private Map<HRegionInfo, ServerName> regionsToAssignWithServerName(
|
||||
final List<Pair<HRegionInfo, ServerName>> regionsInMeta) throws IOException {
|
||||
ServerManager serverManager = ((HMaster) this.server).getServerManager();
|
||||
List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
|
||||
Map<HRegionInfo, ServerName> regionsToAssign =
|
||||
new HashMap<HRegionInfo, ServerName>(regionsInMeta.size());
|
||||
RegionStates regionStates = this.assignmentManager.getRegionStates();
|
||||
for (Pair<HRegionInfo, ServerName> regionLocation : regionsInMeta) {
|
||||
HRegionInfo hri = regionLocation.getFirst();
|
||||
ServerName sn = regionLocation.getSecond();
|
||||
if (regionStates.isRegionOffline(hri)) {
|
||||
if (sn != null && serverManager.isServerOnline(sn)) {
|
||||
this.assignmentManager.addPlan(hri.getEncodedName(), new RegionPlan(hri, null, sn));
|
||||
}
|
||||
regions.add(hri);
|
||||
regionsToAssign.put(hri, sn);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping assign for the region " + hri + " during enable table "
|
||||
|
@ -234,67 +239,6 @@ public class EnableTableHandler extends EventHandler {
|
|||
}
|
||||
}
|
||||
}
|
||||
return regions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run bulk enable.
|
||||
*/
|
||||
class BulkEnabler extends BulkAssigner {
|
||||
private final List<HRegionInfo> regions;
|
||||
// Count of regions in table at time this assign was launched.
|
||||
private final int countOfRegionsInTable;
|
||||
|
||||
BulkEnabler(final Server server, final List<HRegionInfo> regions,
|
||||
final int countOfRegionsInTable, boolean retainAssignment) {
|
||||
super(server);
|
||||
this.regions = regions;
|
||||
this.countOfRegionsInTable = countOfRegionsInTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void populatePool(ExecutorService pool) throws IOException {
|
||||
// In case of masterRestart always go with single assign. Going thro
|
||||
// roundRobinAssignment will use bulkassign which may lead to double assignment.
|
||||
for (HRegionInfo region : regions) {
|
||||
if (assignmentManager.getRegionStates()
|
||||
.isRegionInTransition(region)) {
|
||||
continue;
|
||||
}
|
||||
final HRegionInfo hri = region;
|
||||
pool.execute(Trace.wrap("BulkEnabler.populatePool",new Runnable() {
|
||||
public void run() {
|
||||
assignmentManager.assign(hri, true);
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean waitUntilDone(long timeout)
|
||||
throws InterruptedException {
|
||||
long startTime = System.currentTimeMillis();
|
||||
long remaining = timeout;
|
||||
List<HRegionInfo> regions = null;
|
||||
int lastNumberOfRegions = 0;
|
||||
while (!server.isStopped() && remaining > 0) {
|
||||
Thread.sleep(waitingTimeForEvents);
|
||||
regions = assignmentManager.getRegionStates()
|
||||
.getRegionsOfTable(tableName);
|
||||
if (isDone(regions)) break;
|
||||
|
||||
// Punt on the timeout as long we make progress
|
||||
if (regions.size() > lastNumberOfRegions) {
|
||||
lastNumberOfRegions = regions.size();
|
||||
timeout += waitingTimeForEvents;
|
||||
}
|
||||
remaining = timeout - (System.currentTimeMillis() - startTime);
|
||||
}
|
||||
return isDone(regions);
|
||||
}
|
||||
|
||||
private boolean isDone(final List<HRegionInfo> regions) {
|
||||
return regions != null && regions.size() >= this.countOfRegionsInTable;
|
||||
}
|
||||
return regionsToAssign;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -880,7 +880,7 @@ public class TestAdmin {
|
|||
for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
|
||||
assertEquals(regions2.get(entry.getKey()), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Multi-family scenario. Tests forcing split from client and
|
||||
|
|
|
@ -1158,6 +1158,18 @@ public class TestAssignmentManager {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean assign(ServerName destination, List<HRegionInfo> regions) {
|
||||
if (enabling) {
|
||||
for (HRegionInfo region : regions) {
|
||||
assignmentCount++;
|
||||
this.regionOnline(region, SERVERNAME_A);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return super.assign(destination, regions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assign(List<HRegionInfo> regions)
|
||||
throws IOException, InterruptedException {
|
||||
|
|
Loading…
Reference in New Issue