HBASE-16853 Regions are assigned to Region Servers in /hbase/draining after HBase Master failover (David Pope)
This commit is contained in:
parent
0f384158fc
commit
109db38b6a
|
@ -524,7 +524,6 @@ public class ServerManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public DeadServer getDeadServers() {
|
public DeadServer getDeadServers() {
|
||||||
return this.deadservers;
|
return this.deadservers;
|
||||||
}
|
}
|
||||||
|
@ -734,6 +733,7 @@ public class ServerManager {
|
||||||
"Ignoring request to add it again.");
|
"Ignoring request to add it again.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
LOG.info("Server " + sn + " added to draining server list.");
|
||||||
return this.drainingServers.add(sn);
|
return this.drainingServers.add(sn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerListener;
|
||||||
import org.apache.hadoop.hbase.master.ServerManager;
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
@ -68,6 +69,23 @@ public class DrainingServerTracker extends ZooKeeperListener {
|
||||||
*/
|
*/
|
||||||
public void start() throws KeeperException, IOException {
|
public void start() throws KeeperException, IOException {
|
||||||
watcher.registerListener(this);
|
watcher.registerListener(this);
|
||||||
|
// Add a ServerListener to check if a server is draining when it's added.
|
||||||
|
serverManager.registerListener(
|
||||||
|
new ServerListener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serverAdded(ServerName sn) {
|
||||||
|
if (drainingServers.contains(sn)){
|
||||||
|
serverManager.addServerToDrainList(sn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serverRemoved(ServerName serverName) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
List<String> servers =
|
List<String> servers =
|
||||||
ZKUtil.listChildrenAndWatchThem(watcher, watcher.znodePaths.drainingZNode);
|
ZKUtil.listChildrenAndWatchThem(watcher, watcher.znodePaths.drainingZNode);
|
||||||
add(servers);
|
add(servers);
|
||||||
|
|
|
@ -21,29 +21,41 @@ package org.apache.hadoop.hbase.master;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.ServerLoad;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
@Category({MasterTests.class, MediumTests.class})
|
@Category({MasterTests.class, MediumTests.class})
|
||||||
public class TestAssignmentListener {
|
public class TestAssignmentListener {
|
||||||
|
@ -51,6 +63,16 @@ public class TestAssignmentListener {
|
||||||
|
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static final Abortable abortable = new Abortable() {
|
||||||
|
@Override
|
||||||
|
public boolean isAborted() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abort(String why, Throwable e) {
|
||||||
|
}
|
||||||
|
};
|
||||||
static class DummyListener {
|
static class DummyListener {
|
||||||
protected AtomicInteger modified = new AtomicInteger(0);
|
protected AtomicInteger modified = new AtomicInteger(0);
|
||||||
|
|
||||||
|
@ -260,4 +282,81 @@ public class TestAssignmentListener {
|
||||||
am.unregisterListener(listener);
|
am.unregisterListener(listener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddNewServerThatExistsInDraining() throws Exception {
|
||||||
|
// Under certain circumstances, such as when we failover to the Backup
|
||||||
|
// HMaster, the DrainingServerTracker is started with existing servers in
|
||||||
|
// draining before all of the Region Servers register with the
|
||||||
|
// ServerManager as "online". This test is to ensure that Region Servers
|
||||||
|
// are properly added to the ServerManager.drainingServers when they
|
||||||
|
// register with the ServerManager under these circumstances.
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
ZooKeeperWatcher zooKeeper = new ZooKeeperWatcher(conf,
|
||||||
|
"zkWatcher-NewServerDrainTest", abortable, true);
|
||||||
|
String baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
|
||||||
|
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
||||||
|
String drainingZNode = ZKUtil.joinZNode(baseZNode,
|
||||||
|
conf.get("zookeeper.znode.draining.rs", "draining"));
|
||||||
|
|
||||||
|
HMaster master = Mockito.mock(HMaster.class);
|
||||||
|
Mockito.when(master.getConfiguration()).thenReturn(conf);
|
||||||
|
|
||||||
|
ServerName SERVERNAME_A = ServerName.valueOf("mockserverbulk_a.org", 1000, 8000);
|
||||||
|
ServerName SERVERNAME_B = ServerName.valueOf("mockserverbulk_b.org", 1001, 8000);
|
||||||
|
ServerName SERVERNAME_C = ServerName.valueOf("mockserverbulk_c.org", 1002, 8000);
|
||||||
|
|
||||||
|
// We'll start with 2 servers in draining that existed before the
|
||||||
|
// HMaster started.
|
||||||
|
ArrayList<ServerName> drainingServers = new ArrayList<ServerName>();
|
||||||
|
drainingServers.add(SERVERNAME_A);
|
||||||
|
drainingServers.add(SERVERNAME_B);
|
||||||
|
|
||||||
|
// We'll have 2 servers that come online AFTER the DrainingServerTracker
|
||||||
|
// is started (just as we see when we failover to the Backup HMaster).
|
||||||
|
// One of these will already be a draining server.
|
||||||
|
HashMap<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
|
||||||
|
onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
|
||||||
|
onlineServers.put(SERVERNAME_C, ServerLoad.EMPTY_SERVERLOAD);
|
||||||
|
|
||||||
|
// Create draining znodes for the draining servers, which would have been
|
||||||
|
// performed when the previous HMaster was running.
|
||||||
|
for (ServerName sn : drainingServers) {
|
||||||
|
String znode = ZKUtil.joinZNode(drainingZNode, sn.getServerName());
|
||||||
|
ZKUtil.createAndFailSilent(zooKeeper, znode);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, we follow the same order of steps that the HMaster does to setup
|
||||||
|
// the ServerManager, RegionServerTracker, and DrainingServerTracker.
|
||||||
|
ServerManager serverManager = new ServerManager(master);
|
||||||
|
|
||||||
|
RegionServerTracker regionServerTracker = new RegionServerTracker(
|
||||||
|
zooKeeper, master, serverManager);
|
||||||
|
regionServerTracker.start();
|
||||||
|
|
||||||
|
DrainingServerTracker drainingServerTracker = new DrainingServerTracker(
|
||||||
|
zooKeeper, master, serverManager);
|
||||||
|
drainingServerTracker.start();
|
||||||
|
|
||||||
|
// Confirm our ServerManager lists are empty.
|
||||||
|
Assert.assertEquals(serverManager.getOnlineServers(),
|
||||||
|
new HashMap<ServerName, ServerLoad>());
|
||||||
|
Assert.assertEquals(serverManager.getDrainingServersList(),
|
||||||
|
new ArrayList<ServerName>());
|
||||||
|
|
||||||
|
// checkAndRecordNewServer() is how servers are added to the ServerManager.
|
||||||
|
ArrayList<ServerName> onlineDrainingServers = new ArrayList<ServerName>();
|
||||||
|
for (ServerName sn : onlineServers.keySet()){
|
||||||
|
// Here's the actual test.
|
||||||
|
serverManager.checkAndRecordNewServer(sn, onlineServers.get(sn));
|
||||||
|
if (drainingServers.contains(sn)){
|
||||||
|
onlineDrainingServers.add(sn); // keeping track for later verification
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the ServerManager lists are correctly updated.
|
||||||
|
Assert.assertEquals(serverManager.getOnlineServers(), onlineServers);
|
||||||
|
Assert.assertEquals(serverManager.getDrainingServersList(),
|
||||||
|
onlineDrainingServers);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue