HBASE-4298 Support to drain RS nodes through ZK

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1197256 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-11-03 18:34:41 +00:00
parent a9992a53c4
commit 92b57170e9
7 changed files with 411 additions and 13 deletions

View File

@ -801,6 +801,7 @@ Release 0.92.0 - Unreleased
HBASE-4219 Addendum for failure of TestHFileBlock HBASE-4219 Addendum for failure of TestHFileBlock
HBASE-4377 [hbck] Offline rebuild .META. from fs data only HBASE-4377 [hbck] Offline rebuild .META. from fs data only
(Jonathan Hsieh) (Jonathan Hsieh)
HBASE-4298 Support to drain RS nodes through ZK (Aravind Gottipati)
Release 0.90.5 - Unreleased Release 0.90.5 - Unreleased

View File

@ -1606,37 +1606,54 @@ public class AssignmentManager extends ZooKeeperListener {
RegionPlan getRegionPlan(final RegionState state, RegionPlan getRegionPlan(final RegionState state,
final ServerName serverToExclude, final boolean forceNewPlan) { final ServerName serverToExclude, final boolean forceNewPlan) {
// Pickup existing plan or make a new one // Pickup existing plan or make a new one
String encodedName = state.getRegion().getEncodedName(); final String encodedName = state.getRegion().getEncodedName();
List<ServerName> servers = this.serverManager.getOnlineServersList(); final List<ServerName> servers = this.serverManager.getOnlineServersList();
// The remove below hinges on the fact that the call to final List<ServerName> drainingServers = this.serverManager.getDrainingServersList();
// serverManager.getOnlineServersList() returns a copy
if (serverToExclude != null) servers.remove(serverToExclude); if (serverToExclude != null) servers.remove(serverToExclude);
// Loop through the draining server list and remove them from the server
// list.
if (!drainingServers.isEmpty()) {
for (final ServerName server: drainingServers) {
LOG.debug("Removing draining server: " + server +
" from eligible server pool.");
servers.remove(server);
}
}
if (servers.isEmpty()) return null; if (servers.isEmpty()) return null;
RegionPlan randomPlan = new RegionPlan(state.getRegion(), null, RegionPlan randomPlan = new RegionPlan(state.getRegion(), null,
balancer.randomAssignment(servers)); balancer.randomAssignment(servers));
boolean newPlan = false; boolean newPlan = false;
RegionPlan existingPlan = null; RegionPlan existingPlan = null;
synchronized (this.regionPlans) { synchronized (this.regionPlans) {
existingPlan = this.regionPlans.get(encodedName); existingPlan = this.regionPlans.get(encodedName);
if (existingPlan != null && existingPlan.getDestination() != null) { if (existingPlan != null && existingPlan.getDestination() != null) {
LOG.debug("Found an existing plan for " + LOG.debug("Found an existing plan for " +
state.getRegion().getRegionNameAsString() + state.getRegion().getRegionNameAsString() +
" destination server is + " + existingPlan.getDestination().toString()); " destination server is + " + existingPlan.getDestination().toString());
} }
if (forceNewPlan || existingPlan == null
if (forceNewPlan
|| existingPlan == null
|| existingPlan.getDestination() == null || existingPlan.getDestination() == null
|| existingPlan.getDestination().equals(serverToExclude)) { || drainingServers.contains(existingPlan.getDestination())) {
newPlan = true; newPlan = true;
this.regionPlans.put(encodedName, randomPlan); this.regionPlans.put(encodedName, randomPlan);
} }
} }
if (newPlan) { if (newPlan) {
debugLog(state.getRegion(), "No previous transition plan was found (or we are ignoring " + debugLog(state.getRegion(), "No previous transition plan was found (or we are ignoring " +
"an existing plan) for " + state.getRegion().getRegionNameAsString() + "an existing plan) for " + state.getRegion().getRegionNameAsString() +
" so generated a random one; " + randomPlan + "; " + " so generated a random one; " + randomPlan + "; " +
serverManager.countOfRegionServers() + serverManager.countOfRegionServers() +
" (online=" + serverManager.getOnlineServers().size() + " (online=" + serverManager.getOnlineServers().size() +
", exclude=" + serverToExclude + ") available servers"); ", exclude=" + drainingServers.size() + ") available servers");
return randomPlan; return randomPlan;
} }
debugLog(state.getRegion(), "Using pre-existing plan for region " + debugLog(state.getRegion(), "Using pre-existing plan for region " +

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterId;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -131,6 +132,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
private ActiveMasterManager activeMasterManager; private ActiveMasterManager activeMasterManager;
// Region server tracker // Region server tracker
private RegionServerTracker regionServerTracker; private RegionServerTracker regionServerTracker;
// Draining region server tracker
private DrainingServerTracker drainingServerTracker;
// RPC server for the HMaster // RPC server for the HMaster
private final RpcServer rpcServer; private final RpcServer rpcServer;
@ -370,6 +373,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.serverManager); this.serverManager);
this.regionServerTracker.start(); this.regionServerTracker.start();
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
this.serverManager);
this.drainingServerTracker.start();
// Set the cluster as up. If new RSs, they'll be waiting on this before // Set the cluster as up. If new RSs, they'll be waiting on this before
// going ahead with their startup. // going ahead with their startup.
this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);

View File

@ -80,6 +80,13 @@ public class ServerManager {
private final Map<ServerName, HRegionInterface> serverConnections = private final Map<ServerName, HRegionInterface> serverConnections =
new HashMap<ServerName, HRegionInterface>(); new HashMap<ServerName, HRegionInterface>();
/**
* List of region servers <ServerName> that should not get any more new
* regions.
*/
private final ArrayList<ServerName> drainingServers =
new ArrayList<ServerName>();
private final Server master; private final Server master;
private final MasterServices services; private final MasterServices services;
private final HConnection connection; private final HConnection connection;
@ -367,6 +374,43 @@ public class ServerManager {
carryingRoot + ", meta=" + carryingMeta); carryingRoot + ", meta=" + carryingMeta);
} }
/*
* Remove the server from the drain list.
*/
public boolean removeServerFromDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode>
if (!this.isServerOnline(sn)) {
LOG.warn("Server " + sn + " is not currently online. " +
"Removing from draining list anyway, as requested.");
}
// Remove the server from the draining servers lists.
return this.drainingServers.remove(sn);
}
/*
* Add the server to the drain list.
*/
public boolean addServerToDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode>
if (!this.isServerOnline(sn)) {
LOG.warn("Server " + sn + " is not currently online. " +
"Ignoring request to add it to draining list.");
return false;
}
// Add the server to the draining servers lists, if it's not already in
// it.
if (this.drainingServers.contains(sn)) {
LOG.warn("Server " + sn + " is already in the draining server list." +
"Ignoring request to add it again.");
return false;
}
return this.drainingServers.add(sn);
}
// RPC methods to region servers // RPC methods to region servers
/** /**
@ -489,6 +533,13 @@ public class ServerManager {
return new ArrayList<ServerName>(this.onlineServers.keySet()); return new ArrayList<ServerName>(this.onlineServers.keySet());
} }
/**
* @return A copy of the internal list of draining servers.
*/
public List<ServerName> getDrainingServersList() {
return new ArrayList<ServerName>(this.drainingServers);
}
public boolean isServerOnline(ServerName serverName) { public boolean isServerOnline(ServerName serverName) {
return onlineServers.containsKey(serverName); return onlineServers.containsKey(serverName);
} }

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.zookeeper.KeeperException;
/**
* Tracks the list of draining region servers via ZK.
*
* <p>This class is responsible for watching for changes to the draining
* servers list. It handles adds/deletes in the draining RS list and
* watches each node.
*
* <p>If an RS gets deleted from draining list, we call
* {@link ServerManager#removeServerFromDrainList(ServerName)}
*
* <p>If an RS gets added to the draining list, we add a watcher to it and call
* {@link ServerManager#addServerToDrainList(ServerName)}
*
*/
public class DrainingServerTracker extends ZooKeeperListener {
private static final Log LOG = LogFactory.getLog(DrainingServerTracker.class);
private ServerManager serverManager;
private NavigableSet<ServerName> drainingServers = new TreeSet<ServerName>();
private Abortable abortable;
public DrainingServerTracker(ZooKeeperWatcher watcher,
Abortable abortable, ServerManager serverManager) {
super(watcher);
this.abortable = abortable;
this.serverManager = serverManager;
}
/**
* Starts the tracking of draining RegionServers.
*
* <p>All Draining RSs will be tracked after this method is called.
*
* @throws KeeperException
*/
public void start() throws KeeperException, IOException {
watcher.registerListener(this);
List<String> servers =
ZKUtil.listChildrenAndWatchThem(watcher, watcher.drainingZNode);
add(servers);
}
private void add(final List<String> servers) throws IOException {
synchronized(this.drainingServers) {
this.drainingServers.clear();
for (String n: servers) {
final ServerName sn = new ServerName(ZKUtil.getNodeName(n));
this.drainingServers.add(sn);
this.serverManager.addServerToDrainList(sn);
LOG.info("Draining RS node created, adding to list [" +
sn + "]");
}
}
}
private void remove(final ServerName sn) {
synchronized(this.drainingServers) {
this.drainingServers.remove(sn);
this.serverManager.removeServerFromDrainList(sn);
}
}
@Override
public void nodeDeleted(final String path) {
if(path.startsWith(watcher.drainingZNode)) {
final ServerName sn = new ServerName(ZKUtil.getNodeName(path));
LOG.info("Draining RS node deleted, removing from list [" +
sn + "]");
remove(sn);
}
}
@Override
public void nodeChildrenChanged(final String path) {
if(path.equals(watcher.drainingZNode)) {
try {
final List<String> newNodes =
ZKUtil.listChildrenAndWatchThem(watcher, watcher.drainingZNode);
add(newNodes);
} catch (KeeperException e) {
abortable.abort("Unexpected zk exception getting RS nodes", e);
} catch (IOException e) {
abortable.abort("Unexpected zk exception getting RS nodes", e);
}
}
}
}

View File

@ -73,6 +73,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
public String rootServerZNode; public String rootServerZNode;
// znode containing ephemeral nodes of the regionservers // znode containing ephemeral nodes of the regionservers
public String rsZNode; public String rsZNode;
// znode containing ephemeral nodes of the draining regionservers
public String drainingZNode;
// znode of currently active master // znode of currently active master
public String masterAddressZNode; public String masterAddressZNode;
// znode containing the current cluster state // znode containing the current cluster state
@ -138,6 +140,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
ZKUtil.createAndFailSilent(this, baseZNode); ZKUtil.createAndFailSilent(this, baseZNode);
ZKUtil.createAndFailSilent(this, assignmentZNode); ZKUtil.createAndFailSilent(this, assignmentZNode);
ZKUtil.createAndFailSilent(this, rsZNode); ZKUtil.createAndFailSilent(this, rsZNode);
ZKUtil.createAndFailSilent(this, drainingZNode);
ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, tableZNode);
ZKUtil.createAndFailSilent(this, splitLogZNode); ZKUtil.createAndFailSilent(this, splitLogZNode);
} catch (KeeperException e) { } catch (KeeperException e) {
@ -175,6 +178,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
conf.get("zookeeper.znode.rootserver", "root-region-server")); conf.get("zookeeper.znode.rootserver", "root-region-server"));
rsZNode = ZKUtil.joinZNode(baseZNode, rsZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.rs", "rs")); conf.get("zookeeper.znode.rs", "rs"));
drainingZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.draining.rs", "draining"));
masterAddressZNode = ZKUtil.joinZNode(baseZNode, masterAddressZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.master", "master")); conf.get("zookeeper.znode.master", "master"));
clusterStateZNode = ZKUtil.joinZNode(baseZNode, clusterStateZNode = ZKUtil.joinZNode(baseZNode,

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test the draining servers feature.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-4298">HBASE-4298</a>
*/
public class TestDrainingServer {
private static final Log LOG = LogFactory.getLog(TestDrainingServer.class);
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private static final byte [] TABLENAME = Bytes.toBytes("t");
private static final byte [] FAMILY = Bytes.toBytes("f");
private static final int COUNT_OF_REGIONS = HBaseTestingUtility.KEYS.length;
/**
* Spin up a cluster with a bunch of regions on it.
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(5);
HTableDescriptor htd = new HTableDescriptor(TABLENAME);
htd.addFamily(new HColumnDescriptor(FAMILY));
TEST_UTIL.createMultiRegionsInMeta(TEST_UTIL.getConfiguration(), htd,
HBaseTestingUtility.KEYS);
// Make a mark for the table in the filesystem.
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
FSUtils.createTableDescriptor(fs, FSUtils.getRootDir(TEST_UTIL.getConfiguration()), htd);
// Assign out the regions we just created.
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
admin.disableTable(TABLENAME);
admin.enableTable(TABLENAME);
// Assert that every regionserver has some regions on it.
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
HRegionServer hrs = cluster.getRegionServer(i);
Assert.assertFalse(hrs.getOnlineRegions().isEmpty());
}
}
private static HRegionServer setDrainingServer(final HRegionServer hrs)
throws KeeperException {
LOG.info("Making " + hrs.getServerName() + " the draining server; " +
"it has " + hrs.getNumberOfOnlineRegions() + " online regions");
ZooKeeperWatcher zkw = hrs.getZooKeeper();
String hrsDrainingZnode =
ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString());
ZKUtil.createWithParents(zkw, hrsDrainingZnode);
return hrs;
}
private static HRegionServer unsetDrainingServer(final HRegionServer hrs)
throws KeeperException {
ZooKeeperWatcher zkw = hrs.getZooKeeper();
String hrsDrainingZnode =
ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString());
ZKUtil.deleteNode(zkw, hrsDrainingZnode);
return hrs;
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Test adding server to draining servers and then move regions off it.
* Make sure that no regions are moved back to the draining server.
* @throws IOException
* @throws KeeperException
*/
@Test // (timeout=30000)
public void testDrainingServerOffloading()
throws IOException, KeeperException {
// I need master in the below.
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
HRegionInfo hriToMoveBack = null;
// Set first server as draining server.
HRegionServer drainingServer =
setDrainingServer(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0));
try {
final int regionsOnDrainingServer =
drainingServer.getNumberOfOnlineRegions();
Assert.assertTrue(regionsOnDrainingServer > 0);
List<HRegionInfo> hris = drainingServer.getOnlineRegions();
for (HRegionInfo hri : hris) {
// Pass null and AssignmentManager will chose a random server BUT it
// should exclude draining servers.
master.move(hri.getEncodedNameAsBytes(), null);
// Save off region to move back.
hriToMoveBack = hri;
}
// Wait for regions to come back on line again.
waitForAllRegionsOnline();
Assert.assertEquals(0, drainingServer.getNumberOfOnlineRegions());
} finally {
unsetDrainingServer(drainingServer);
}
// Now we've unset the draining server, we should be able to move a region
// to what was the draining server.
master.move(hriToMoveBack.getEncodedNameAsBytes(),
Bytes.toBytes(drainingServer.getServerName().toString()));
// Wait for regions to come back on line again.
waitForAllRegionsOnline();
Assert.assertEquals(1, drainingServer.getNumberOfOnlineRegions());
}
/**
* Test that draining servers are ignored even after killing regionserver(s).
* Verify that the draining server is not given any of the dead servers regions.
* @throws KeeperException
* @throws IOException
*/
@Test (timeout=30000)
public void testDrainingServerWithAbort() throws KeeperException, IOException {
// Add first server to draining servers up in zk.
HRegionServer drainingServer =
setDrainingServer(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0));
try {
final int regionsOnDrainingServer =
drainingServer.getNumberOfOnlineRegions();
Assert.assertTrue(regionsOnDrainingServer > 0);
// Kill a few regionservers.
int aborted = 0;
final int numberToAbort = 2;
for (int i = 1; i < TEST_UTIL.getMiniHBaseCluster().countServedRegions(); i++) {
HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i);
if (hrs.getServerName().equals(drainingServer.getServerName())) continue;
hrs.abort("Aborting");
aborted++;
if (aborted >= numberToAbort) break;
}
// Wait for regions to come back on line again.
waitForAllRegionsOnline();
// Assert the draining server still has the same number of regions.
Assert.assertEquals(regionsOnDrainingServer,
drainingServer.getNumberOfOnlineRegions());
} finally {
unsetDrainingServer(drainingServer);
}
}
private void waitForAllRegionsOnline() {
while (TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().isRegionsInTransition()) {
Threads.sleep(10);
}
// Wait for regions to come back on line again.
while (!isAllRegionsOnline()) {
Threads.sleep(10);
}
}
private boolean isAllRegionsOnline() {
return TEST_UTIL.getMiniHBaseCluster().countServedRegions() ==
(COUNT_OF_REGIONS + 2 /*catalog regions*/);
}
}