HBASE-5737 Minor Improvements related to balancer. (Ram)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1328057 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ramkrishna 2012-04-19 18:05:31 +00:00
parent a843058f5f
commit 13b35ce82c
3 changed files with 29 additions and 18 deletions

View File

@ -199,7 +199,8 @@ public class AssignmentManager extends ZooKeeperListener {
* @throws IOException * @throws IOException
*/ */
public AssignmentManager(Server master, ServerManager serverManager, public AssignmentManager(Server master, ServerManager serverManager,
CatalogTracker catalogTracker, final ExecutorService service, MasterMetrics metrics) CatalogTracker catalogTracker, final LoadBalancer balancer,
final ExecutorService service, MasterMetrics metrics)
throws KeeperException, IOException { throws KeeperException, IOException {
super(master.getZooKeeper()); super(master.getZooKeeper());
this.master = master; this.master = master;
@ -218,7 +219,7 @@ public class AssignmentManager extends ZooKeeperListener {
this.zkTable = new ZKTable(this.master.getZooKeeper()); this.zkTable = new ZKTable(this.master.getZooKeeper());
this.maximumAssignmentAttempts = this.maximumAssignmentAttempts =
this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.balancer = balancer;
this.threadPoolExecutorService = Executors.newCachedThreadPool(); this.threadPoolExecutorService = Executors.newCachedThreadPool();
this.masterMetrics = metrics;// can be null only with tests. this.masterMetrics = metrics;// can be null only with tests.
} }
@ -1820,8 +1821,7 @@ public class AssignmentManager extends ZooKeeperListener {
if (servers.isEmpty()) return null; if (servers.isEmpty()) return null;
RegionPlan randomPlan = new RegionPlan(state.getRegion(), null, RegionPlan randomPlan = null;
balancer.randomAssignment(state.getRegion(), servers));
boolean newPlan = false; boolean newPlan = false;
RegionPlan existingPlan = null; RegionPlan existingPlan = null;
@ -1839,6 +1839,8 @@ public class AssignmentManager extends ZooKeeperListener {
|| existingPlan.getDestination() == null || existingPlan.getDestination() == null
|| drainingServers.contains(existingPlan.getDestination())) { || drainingServers.contains(existingPlan.getDestination())) {
newPlan = true; newPlan = true;
randomPlan = new RegionPlan(state.getRegion(), null,
balancer.randomAssignment(state.getRegion(), servers));
this.regionPlans.put(encodedName, randomPlan); this.regionPlans.put(encodedName, randomPlan);
} }
} }
@ -2989,6 +2991,7 @@ public class AssignmentManager extends ZooKeeperListener {
public boolean isCarryingMeta(ServerName serverName) { public boolean isCarryingMeta(ServerName serverName) {
return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO); return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
} }
/** /**
* Check if the shutdown server carries the specific region. * Check if the shutdown server carries the specific region.
* We have a bunch of places that store region location * We have a bunch of places that store region location

View File

@ -426,9 +426,9 @@ Server {
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
this.catalogTracker.start(); this.catalogTracker.start();
this.assignmentManager = new AssignmentManager(this, serverManager,
this.catalogTracker, this.executorService, this.metrics);
this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.assignmentManager = new AssignmentManager(this, serverManager,
this.catalogTracker, this.balancer, this.executorService, this.metrics);
zooKeeper.registerListenerFirst(assignmentManager); zooKeeper.registerListenerFirst(assignmentManager);
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
@ -585,12 +585,12 @@ Server {
org.apache.hadoop.hbase.catalog.MetaMigrationRemovingHTD. org.apache.hadoop.hbase.catalog.MetaMigrationRemovingHTD.
updateMetaWithNewHRI(this); updateMetaWithNewHRI(this);
this.balancer.setMasterServices(this);
// Fixup assignment manager status // Fixup assignment manager status
status.setStatus("Starting assignment manager"); status.setStatus("Starting assignment manager");
this.assignmentManager.joinCluster(onlineServers); this.assignmentManager.joinCluster(onlineServers);
this.balancer.setClusterStatus(getClusterStatus()); this.balancer.setClusterStatus(getClusterStatus());
this.balancer.setMasterServices(this);
// Fixing up missing daughters if any // Fixing up missing daughters if any
status.setStatus("Fixing up missing daughters"); status.setStatus("Fixing up missing daughters");

View File

@ -316,9 +316,11 @@ public class TestAssignmentManager {
// We need a mocked catalog tracker. // We need a mocked catalog tracker.
CatalogTracker ct = Mockito.mock(CatalogTracker.class); CatalogTracker ct = Mockito.mock(CatalogTracker.class);
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server
.getConfiguration());
// Create an AM. // Create an AM.
AssignmentManager am = AssignmentManager am = new AssignmentManager(this.server,
new AssignmentManager(this.server, this.serverManager, ct, executor, null); this.serverManager, ct, balancer, executor, null);
try { try {
// Make sure our new AM gets callbacks; once registered, can't unregister. // Make sure our new AM gets callbacks; once registered, can't unregister.
// Thats ok because we make a new zk watcher for each test. // Thats ok because we make a new zk watcher for each test.
@ -382,9 +384,11 @@ public class TestAssignmentManager {
// We need a mocked catalog tracker. // We need a mocked catalog tracker.
CatalogTracker ct = Mockito.mock(CatalogTracker.class); CatalogTracker ct = Mockito.mock(CatalogTracker.class);
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server
.getConfiguration());
// Create an AM. // Create an AM.
AssignmentManager am = AssignmentManager am = new AssignmentManager(this.server,
new AssignmentManager(this.server, this.serverManager, ct, executor, null); this.serverManager, ct, balancer, executor, null);
try { try {
// Make sure our new AM gets callbacks; once registered, can't unregister. // Make sure our new AM gets callbacks; once registered, can't unregister.
// Thats ok because we make a new zk watcher for each test. // Thats ok because we make a new zk watcher for each test.
@ -457,9 +461,11 @@ public class TestAssignmentManager {
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, hri, -1)).thenReturn(true); Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, hri, -1)).thenReturn(true);
// Need a mocked catalog tracker. // Need a mocked catalog tracker.
CatalogTracker ct = Mockito.mock(CatalogTracker.class); CatalogTracker ct = Mockito.mock(CatalogTracker.class);
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server
.getConfiguration());
// Create an AM. // Create an AM.
AssignmentManager am = AssignmentManager am = new AssignmentManager(this.server,
new AssignmentManager(this.server, this.serverManager, ct, null, null); this.serverManager, ct, balancer, null, null);
try { try {
// First make sure my mock up basically works. Unassign a region. // First make sure my mock up basically works. Unassign a region.
unassign(am, SERVERNAME_A, hri); unassign(am, SERVERNAME_A, hri);
@ -607,8 +613,10 @@ public class TestAssignmentManager {
Mockito.when(ct.getConnection()).thenReturn(connection); Mockito.when(ct.getConnection()).thenReturn(connection);
// Create and startup an executor. Used by AM handling zk callbacks. // Create and startup an executor. Used by AM handling zk callbacks.
ExecutorService executor = startupMasterExecutor("mockedAMExecutor"); ExecutorService executor = startupMasterExecutor("mockedAMExecutor");
AssignmentManagerWithExtrasForTesting am = LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server
new AssignmentManagerWithExtrasForTesting(server, manager, ct, executor); .getConfiguration());
AssignmentManagerWithExtrasForTesting am = new AssignmentManagerWithExtrasForTesting(
server, manager, ct, balancer, executor);
return am; return am;
} }
@ -624,10 +632,10 @@ public class TestAssignmentManager {
AtomicBoolean gate = new AtomicBoolean(true); AtomicBoolean gate = new AtomicBoolean(true);
public AssignmentManagerWithExtrasForTesting(final Server master, public AssignmentManagerWithExtrasForTesting(final Server master,
final ServerManager serverManager, final ServerManager serverManager, final CatalogTracker catalogTracker,
final CatalogTracker catalogTracker, final ExecutorService service) final LoadBalancer balancer, final ExecutorService service)
throws KeeperException, IOException { throws KeeperException, IOException {
super(master, serverManager, catalogTracker, service, null); super(master, serverManager, catalogTracker, balancer, service, null);
this.es = service; this.es = service;
this.ct = catalogTracker; this.ct = catalogTracker;
} }