HBASE-6046 Master retry on ZK session expiry causes inconsistent region assignments. (Asutosh)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1346460 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
52b6797947
commit
fc4d7d7a59
|
@ -406,7 +406,7 @@ Server {
|
||||||
|
|
||||||
// We are either the active master or we were asked to shutdown
|
// We are either the active master or we were asked to shutdown
|
||||||
if (!this.stopped) {
|
if (!this.stopped) {
|
||||||
finishInitialization(startupStatus);
|
finishInitialization(startupStatus, false);
|
||||||
loop();
|
loop();
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
@ -549,9 +549,9 @@ Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finish initialization of HMaster after becoming the primary master.
|
* Finish initialization of HMaster after becoming the primary master.
|
||||||
*
|
*
|
||||||
* <ol>
|
* <ol>
|
||||||
* <li>Initialize master components - file system manager, server manager,
|
* <li>Initialize master components - file system manager, server manager,
|
||||||
* assignment manager, region server tracker, catalog tracker, etc</li>
|
* assignment manager, region server tracker, catalog tracker, etc</li>
|
||||||
|
@ -563,12 +563,14 @@ Server {
|
||||||
* <li>Ensure assignment of root and meta regions<li>
|
* <li>Ensure assignment of root and meta regions<li>
|
||||||
* <li>Handle either fresh cluster start or master failover</li>
|
* <li>Handle either fresh cluster start or master failover</li>
|
||||||
* </ol>
|
* </ol>
|
||||||
*
|
*
|
||||||
|
* @param masterRecovery
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
* @throws KeeperException
|
* @throws KeeperException
|
||||||
*/
|
*/
|
||||||
private void finishInitialization(MonitoredTask status)
|
private void finishInitialization(MonitoredTask status, boolean masterRecovery)
|
||||||
throws IOException, InterruptedException, KeeperException {
|
throws IOException, InterruptedException, KeeperException {
|
||||||
|
|
||||||
isActiveMaster = true;
|
isActiveMaster = true;
|
||||||
|
@ -582,7 +584,7 @@ Server {
|
||||||
status.setStatus("Initializing Master file system");
|
status.setStatus("Initializing Master file system");
|
||||||
this.masterActiveTime = System.currentTimeMillis();
|
this.masterActiveTime = System.currentTimeMillis();
|
||||||
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
|
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
|
||||||
this.fileSystemManager = new MasterFileSystem(this, this, metrics);
|
this.fileSystemManager = new MasterFileSystem(this, this, metrics, masterRecovery);
|
||||||
|
|
||||||
this.tableDescriptors =
|
this.tableDescriptors =
|
||||||
new FSTableDescriptors(this.fileSystemManager.getFileSystem(),
|
new FSTableDescriptors(this.fileSystemManager.getFileSystem(),
|
||||||
|
@ -592,20 +594,23 @@ Server {
|
||||||
status.setStatus("Publishing Cluster ID in ZooKeeper");
|
status.setStatus("Publishing Cluster ID in ZooKeeper");
|
||||||
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
|
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
|
||||||
|
|
||||||
this.executorService = new ExecutorService(getServerName().toString());
|
if (!masterRecovery) {
|
||||||
|
this.executorService = new ExecutorService(getServerName().toString());
|
||||||
this.serverManager = createServerManager(this, this);
|
this.serverManager = createServerManager(this, this);
|
||||||
|
}
|
||||||
|
|
||||||
status.setStatus("Initializing ZK system trackers");
|
status.setStatus("Initializing ZK system trackers");
|
||||||
initializeZKBasedSystemTrackers();
|
initializeZKBasedSystemTrackers();
|
||||||
|
|
||||||
// initialize master side coprocessors before we start handling requests
|
if (!masterRecovery) {
|
||||||
status.setStatus("Initializing master coprocessors");
|
// initialize master side coprocessors before we start handling requests
|
||||||
this.cpHost = new MasterCoprocessorHost(this, this.conf);
|
status.setStatus("Initializing master coprocessors");
|
||||||
|
this.cpHost = new MasterCoprocessorHost(this, this.conf);
|
||||||
// start up all service threads.
|
|
||||||
status.setStatus("Initializing master service threads");
|
// start up all service threads.
|
||||||
startServiceThreads();
|
status.setStatus("Initializing master service threads");
|
||||||
|
startServiceThreads();
|
||||||
|
}
|
||||||
|
|
||||||
// Wait for region servers to report in.
|
// Wait for region servers to report in.
|
||||||
this.serverManager.waitForRegionServers(status);
|
this.serverManager.waitForRegionServers(status);
|
||||||
|
@ -619,7 +624,9 @@ Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.assignmentManager.startTimeOutMonitor();
|
if (!masterRecovery) {
|
||||||
|
this.assignmentManager.startTimeOutMonitor();
|
||||||
|
}
|
||||||
// TODO: Should do this in background rather than block master startup
|
// TODO: Should do this in background rather than block master startup
|
||||||
status.setStatus("Splitting logs after master startup");
|
status.setStatus("Splitting logs after master startup");
|
||||||
splitLogAfterStartup(this.fileSystemManager);
|
splitLogAfterStartup(this.fileSystemManager);
|
||||||
|
@ -647,14 +654,16 @@ Server {
|
||||||
status.setStatus("Fixing up missing daughters");
|
status.setStatus("Fixing up missing daughters");
|
||||||
fixupDaughters(status);
|
fixupDaughters(status);
|
||||||
|
|
||||||
// Start balancer and meta catalog janitor after meta and regions have
|
if (!masterRecovery) {
|
||||||
// been assigned.
|
// Start balancer and meta catalog janitor after meta and regions have
|
||||||
status.setStatus("Starting balancer and catalog janitor");
|
// been assigned.
|
||||||
this.balancerChore = getAndStartBalancerChore(this);
|
status.setStatus("Starting balancer and catalog janitor");
|
||||||
this.catalogJanitorChore = new CatalogJanitor(this, this);
|
this.balancerChore = getAndStartBalancerChore(this);
|
||||||
startCatalogJanitorChore();
|
this.catalogJanitorChore = new CatalogJanitor(this, this);
|
||||||
|
startCatalogJanitorChore();
|
||||||
registerMBean();
|
|
||||||
|
registerMBean();
|
||||||
|
}
|
||||||
|
|
||||||
status.markComplete("Initialization successful");
|
status.markComplete("Initialization successful");
|
||||||
LOG.info("Master has completed initialization");
|
LOG.info("Master has completed initialization");
|
||||||
|
@ -664,12 +673,14 @@ Server {
|
||||||
// master initialization. See HBASE-5916.
|
// master initialization. See HBASE-5916.
|
||||||
this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
|
this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
|
||||||
|
|
||||||
if (this.cpHost != null) {
|
if (!masterRecovery) {
|
||||||
// don't let cp initialization errors kill the master
|
if (this.cpHost != null) {
|
||||||
try {
|
// don't let cp initialization errors kill the master
|
||||||
this.cpHost.postStartMaster();
|
try {
|
||||||
} catch (IOException ioe) {
|
this.cpHost.postStartMaster();
|
||||||
LOG.error("Coprocessor postStartMaster() hook failed", ioe);
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Coprocessor postStartMaster() hook failed", ioe);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1750,13 +1761,9 @@ Server {
|
||||||
if (!becomeActiveMaster(status)) {
|
if (!becomeActiveMaster(status)) {
|
||||||
return Boolean.FALSE;
|
return Boolean.FALSE;
|
||||||
}
|
}
|
||||||
initializeZKBasedSystemTrackers();
|
serverShutdownHandlerEnabled = false;
|
||||||
// Update in-memory structures to reflect our earlier Root/Meta assignment.
|
initialized = false;
|
||||||
assignRootAndMeta(status);
|
finishInitialization(status, true);
|
||||||
// process RIT if any
|
|
||||||
// TODO: Why does this not call AssignmentManager.joinCluster? Otherwise
|
|
||||||
// we are not processing dead servers if any.
|
|
||||||
assignmentManager.processDeadServersAndRegionsInTransition();
|
|
||||||
return Boolean.TRUE;
|
return Boolean.TRUE;
|
||||||
} finally {
|
} finally {
|
||||||
status.cleanup();
|
status.cleanup();
|
||||||
|
|
|
@ -85,7 +85,7 @@ public class MasterFileSystem {
|
||||||
private final MasterServices services;
|
private final MasterServices services;
|
||||||
|
|
||||||
public MasterFileSystem(Server master, MasterServices services,
|
public MasterFileSystem(Server master, MasterServices services,
|
||||||
MasterMetrics metrics)
|
MasterMetrics metrics, boolean masterRecovery)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.conf = master.getConfiguration();
|
this.conf = master.getConfiguration();
|
||||||
this.master = master;
|
this.master = master;
|
||||||
|
@ -107,7 +107,7 @@ public class MasterFileSystem {
|
||||||
if (this.distributedLogSplitting) {
|
if (this.distributedLogSplitting) {
|
||||||
this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
|
this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
|
||||||
master.getConfiguration(), master, master.getServerName());
|
master.getConfiguration(), master, master.getServerName());
|
||||||
this.splitLogManager.finishInitialization();
|
this.splitLogManager.finishInitialization(masterRecovery);
|
||||||
} else {
|
} else {
|
||||||
this.splitLogManager = null;
|
this.splitLogManager = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,9 +182,11 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
|
new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void finishInitialization() {
|
public void finishInitialization(boolean masterRecovery) {
|
||||||
Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName +
|
if (!masterRecovery) {
|
||||||
".splitLogManagerTimeoutMonitor");
|
Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
|
||||||
|
+ ".splitLogManagerTimeoutMonitor");
|
||||||
|
}
|
||||||
// Watcher can be null during tests with Mock'd servers.
|
// Watcher can be null during tests with Mock'd servers.
|
||||||
if (this.watcher != null) {
|
if (this.watcher != null) {
|
||||||
this.watcher.registerListener(this);
|
this.watcher.registerListener(this);
|
||||||
|
@ -1207,4 +1209,12 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
return statusMsg;
|
return statusMsg;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Completes the initialization
|
||||||
|
*/
|
||||||
|
public void finishInitialization() {
|
||||||
|
finishInitialization(false);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,9 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -38,9 +41,15 @@ import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||||
|
import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
|
import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
@ -56,6 +65,8 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Category(LargeTests.class)
|
@Category(LargeTests.class)
|
||||||
public class TestZooKeeper {
|
public class TestZooKeeper {
|
||||||
private final Log LOG = LogFactory.getLog(this.getClass());
|
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||||
|
@ -73,6 +84,8 @@ public class TestZooKeeper {
|
||||||
TEST_UTIL.startMiniZKCluster();
|
TEST_UTIL.startMiniZKCluster();
|
||||||
conf.setBoolean("dfs.support.append", true);
|
conf.setBoolean("dfs.support.append", true);
|
||||||
conf.setInt(HConstants.ZOOKEEPER_SESSION_TIMEOUT, 1000);
|
conf.setInt(HConstants.ZOOKEEPER_SESSION_TIMEOUT, 1000);
|
||||||
|
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MockLoadBalancer.class,
|
||||||
|
LoadBalancer.class);
|
||||||
TEST_UTIL.startMiniCluster(2);
|
TEST_UTIL.startMiniCluster(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -362,6 +375,94 @@ public class TestZooKeeper {
|
||||||
ZKUtil.getChildDataAndWatchForNewChildren(zkw, "/wrongNode");
|
ZKUtil.getChildDataAndWatchForNewChildren(zkw, "/wrongNode");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that the master does not call retainAssignment after recovery from expired zookeeper
|
||||||
|
* session. Without the HBASE-6046 fix master always tries to assign all the user regions by
|
||||||
|
* calling retainAssignment.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRegionAssignmentAfterMasterRecoveryDueToZKExpiry() throws Exception {
|
||||||
|
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
cluster.startRegionServer();
|
||||||
|
HMaster m = cluster.getMaster();
|
||||||
|
// now the cluster is up. So assign some regions.
|
||||||
|
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
|
||||||
|
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"),
|
||||||
|
Bytes.toBytes("c"), Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
|
||||||
|
Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j") };
|
||||||
|
|
||||||
|
String tableName = "testRegionAssignmentAfterMasterRecoveryDueToZKExpiry";
|
||||||
|
admin.createTable(new HTableDescriptor(tableName), SPLIT_KEYS);
|
||||||
|
ZooKeeperWatcher zooKeeperWatcher = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL);
|
||||||
|
ZKAssign.blockUntilNoRIT(zooKeeperWatcher);
|
||||||
|
m.getZooKeeperWatcher().close();
|
||||||
|
MockLoadBalancer.retainAssignCalled = false;
|
||||||
|
m.abort("Test recovery from zk session expired", new KeeperException.SessionExpiredException());
|
||||||
|
assertFalse(m.isStopped());
|
||||||
|
// The recovered master should not call retainAssignment, as it is not a
|
||||||
|
// clean startup.
|
||||||
|
assertFalse("Retain assignment should not be called", MockLoadBalancer.retainAssignCalled);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests whether the logs are split when master recovers from a expired zookeeper session and an
|
||||||
|
* RS goes down.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testLogSplittingAfterMasterRecoveryDueToZKExpiry() throws IOException,
|
||||||
|
KeeperException, InterruptedException {
|
||||||
|
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
cluster.startRegionServer();
|
||||||
|
HMaster m = cluster.getMaster();
|
||||||
|
// now the cluster is up. So assign some regions.
|
||||||
|
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
|
||||||
|
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("1"), Bytes.toBytes("2"),
|
||||||
|
Bytes.toBytes("3"), Bytes.toBytes("4"), Bytes.toBytes("5") };
|
||||||
|
|
||||||
|
String tableName = "testLogSplittingAfterMasterRecoveryDueToZKExpiry";
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
HColumnDescriptor hcd = new HColumnDescriptor("col");
|
||||||
|
htd.addFamily(hcd);
|
||||||
|
admin.createTable(htd, SPLIT_KEYS);
|
||||||
|
ZooKeeperWatcher zooKeeperWatcher = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL);
|
||||||
|
ZKAssign.blockUntilNoRIT(zooKeeperWatcher);
|
||||||
|
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||||
|
|
||||||
|
Put p = null;
|
||||||
|
int numberOfPuts = 0;
|
||||||
|
for (numberOfPuts = 0; numberOfPuts < 6; numberOfPuts++) {
|
||||||
|
p = new Put(Bytes.toBytes(numberOfPuts));
|
||||||
|
p.add(Bytes.toBytes("col"), Bytes.toBytes("ql"), Bytes.toBytes("value" + numberOfPuts));
|
||||||
|
table.put(p);
|
||||||
|
}
|
||||||
|
m.getZooKeeperWatcher().close();
|
||||||
|
m.abort("Test recovery from zk session expired", new KeeperException.SessionExpiredException());
|
||||||
|
assertFalse(m.isStopped());
|
||||||
|
cluster.getRegionServer(0).abort("Aborting");
|
||||||
|
// Without patch for HBASE-6046 this test case will always timeout
|
||||||
|
// with patch the test case should pass.
|
||||||
|
Scan scan = new Scan();
|
||||||
|
int numberOfRows = 0;
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
Result[] result = scanner.next(1);
|
||||||
|
while (result != null && result.length > 0) {
|
||||||
|
numberOfRows++;
|
||||||
|
result = scanner.next(1);
|
||||||
|
}
|
||||||
|
assertEquals("Number of rows should be equal to number of puts.", numberOfPuts, numberOfRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
static class MockLoadBalancer extends DefaultLoadBalancer {
|
||||||
|
static boolean retainAssignCalled = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<ServerName, List<HRegionInfo>> retainAssignment(
|
||||||
|
Map<HRegionInfo, ServerName> regions, List<ServerName> servers) {
|
||||||
|
retainAssignCalled = true;
|
||||||
|
return super.retainAssignment(regions, servers);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@org.junit.Rule
|
@org.junit.Rule
|
||||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||||
|
|
|
@ -165,7 +165,7 @@ public class TestCatalogJanitor {
|
||||||
private final AssignmentManager asm;
|
private final AssignmentManager asm;
|
||||||
|
|
||||||
MockMasterServices(final Server server) throws IOException {
|
MockMasterServices(final Server server) throws IOException {
|
||||||
this.mfs = new MasterFileSystem(server, this, null);
|
this.mfs = new MasterFileSystem(server, this, null, false);
|
||||||
this.asm = Mockito.mock(AssignmentManager.class);
|
this.asm = Mockito.mock(AssignmentManager.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue