HBASE-3159 Double play of OpenedRegionHandler for a single region and assorted fixes around this + TestRollingRestart added
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1028497 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
903a947a20
commit
3523b66eec
|
@ -620,6 +620,9 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-3155 HFile.appendMetaBlock() uses wrong comparator
|
||||
(Nicolas Spiegelberg via Stack)
|
||||
HBASE-3012 TOF doesn't take zk client port for remote clusters
|
||||
HBASE-3159 Double play of OpenedRegionHandler for a single region
|
||||
and assorted fixes around this + TestRollingRestart added
|
||||
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
|
|
@ -238,6 +238,33 @@ public class LocalHBaseCluster {
|
|||
return regionServerThread.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the specified region server to stop
|
||||
* Removes this thread from list of running threads.
|
||||
* @param serverNumber
|
||||
* @return Name of region server that just went down.
|
||||
*/
|
||||
public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
|
||||
while (rst.isAlive()) {
|
||||
try {
|
||||
LOG.info("Waiting on " +
|
||||
rst.getRegionServer().getHServerInfo().toString());
|
||||
rst.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
for (int i=0;i<regionThreads.size();i++) {
|
||||
if (regionThreads.get(i) == rst) {
|
||||
regionThreads.remove(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return rst.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param serverNumber
|
||||
* @return the HMaster thread
|
||||
|
@ -305,6 +332,31 @@ public class LocalHBaseCluster {
|
|||
return masterThread.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the specified master to stop
|
||||
* Removes this thread from list of running threads.
|
||||
* @param serverNumber
|
||||
* @return Name of master that just went down.
|
||||
*/
|
||||
public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
|
||||
while (masterThread.isAlive()) {
|
||||
try {
|
||||
LOG.info("Waiting on " +
|
||||
masterThread.getMaster().getServerName().toString());
|
||||
masterThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
for (int i=0;i<masterThreads.size();i++) {
|
||||
if (masterThreads.get(i) == masterThread) {
|
||||
masterThreads.remove(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return masterThread.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for Mini HBase Cluster to shut down.
|
||||
* Presumes you've already called {@link #shutdown()}.
|
||||
|
|
|
@ -427,6 +427,8 @@ public class CatalogTracker {
|
|||
Throwable cause = e.getCause();
|
||||
if (cause != null && cause instanceof EOFException) {
|
||||
t = cause;
|
||||
} else if (cause.getMessage().contains("Connection reset")) {
|
||||
t = cause;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
|
|
|
@ -186,8 +186,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// TODO: Check list of user regions and their assignments against regionservers.
|
||||
// TODO: Regions that have a null location and are not in regionsInTransitions
|
||||
// need to be handled.
|
||||
// TODO: Regions that are on servers that are not in our online list need
|
||||
// reassigning.
|
||||
|
||||
// Scan META to build list of existing regions, servers, and assignment
|
||||
// Returns servers who have not checked in (assumed dead) and their regions
|
||||
|
@ -390,6 +388,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
return;
|
||||
}
|
||||
// Handle OPENED by removing from transition and deleted zk node
|
||||
regionState.update(RegionState.State.OPEN, data.getStamp());
|
||||
this.executorService.submit(
|
||||
new OpenedRegionHandler(master, this, data, regionState.getRegion(),
|
||||
this.serverManager.getServerInfo(data.getServerName())));
|
||||
|
@ -802,16 +801,19 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
try {
|
||||
LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() +
|
||||
" to " + plan.getDestination().getServerName());
|
||||
// Send OPEN RPC. This can fail if the server on other end is is not up.
|
||||
serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());
|
||||
// Transition RegionState to PENDING_OPEN
|
||||
state.update(RegionState.State.PENDING_OPEN);
|
||||
// Send OPEN RPC. This can fail if the server on other end is is not up.
|
||||
serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed assignment of " +
|
||||
state.getRegion().getRegionNameAsString() + " to " +
|
||||
plan.getDestination() + ", trying to assign elsewhere instead", t);
|
||||
// Clean out plan we failed execute and one that doesn't look like it'll
|
||||
// succeed anyways; we need a new plan!
|
||||
// Transition back to OFFLINE
|
||||
state.update(RegionState.State.OFFLINE);
|
||||
// Remove the plan
|
||||
this.regionPlans.remove(state.getRegion().getEncodedName());
|
||||
// Put in place a new plan and reassign. Calling getRegionPlan will add
|
||||
// a plan if none exists (We removed it in line above).
|
||||
|
@ -982,9 +984,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
} catch (NotServingRegionException nsre) {
|
||||
// Did not CLOSE, so set region offline and assign it
|
||||
LOG.debug("Attempted to send CLOSE for region " +
|
||||
region.getRegionNameAsString() + " but failed, setting region as " +
|
||||
"OFFLINE and reassigning");
|
||||
LOG.debug("Attempted to send CLOSE to " + regions.get(region) +
|
||||
" for region " + region.getRegionNameAsString() + " but failed, " +
|
||||
"setting region as OFFLINE and reassigning");
|
||||
synchronized (regionsInTransition) {
|
||||
forceRegionStateToOffline(region);
|
||||
assign(region);
|
||||
|
@ -994,6 +996,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// St.Ack 20101012
|
||||
// I don't think IOE can happen anymore, only NSRE IOE is used here
|
||||
// should be able to remove this at least. jgray 20101024
|
||||
// I lied, we actually get RemoteException wrapping our NSRE, need to unwrap
|
||||
this.master.abort("Remote unexpected exception", e);
|
||||
} catch (Throwable t) {
|
||||
// For now call abort if unexpected exception -- radical, but will get fellas attention.
|
||||
|
|
|
@ -277,9 +277,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
stopServiceThreads();
|
||||
// Stop services started for both backup and active masters
|
||||
if (this.activeMasterManager != null) this.activeMasterManager.stop();
|
||||
this.catalogTracker.stop();
|
||||
this.serverManager.stop();
|
||||
this.assignmentManager.stop();
|
||||
if (this.catalogTracker != null) this.catalogTracker.stop();
|
||||
if (this.serverManager != null) this.serverManager.stop();
|
||||
if (this.assignmentManager != null) this.assignmentManager.stop();
|
||||
HConnectionManager.deleteConnection(this.conf, true);
|
||||
this.zooKeeper.close();
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ public class ServerManager {
|
|||
// Reporting to track master metrics.
|
||||
private final MasterMetrics metrics;
|
||||
|
||||
private final DeadServer deadservers = new DeadServer();
|
||||
final DeadServer deadservers = new DeadServer();
|
||||
|
||||
/**
|
||||
* Dumps into log current stats on dead servers and number of servers
|
||||
|
|
|
@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.Server;
|
|||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.RegionTransitionData;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* Handles CLOSED region event on Master.
|
||||
|
@ -88,7 +86,7 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
|
|||
|
||||
@Override
|
||||
public void process() {
|
||||
LOG.debug("Handling CLOSED event");
|
||||
LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName());
|
||||
// Check if this table is being disabled or not
|
||||
if (assignmentManager.isTableOfRegionDisabled(regionInfo.getRegionName())) {
|
||||
assignmentManager.offlineDisabledRegion(regionInfo);
|
||||
|
|
|
@ -99,7 +99,8 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
|
|||
+ "this table is disabled, triggering close of region");
|
||||
assignmentManager.unassign(regionInfo);
|
||||
} else {
|
||||
LOG.debug("Opened region " + regionInfo.getRegionNameAsString());
|
||||
LOG.debug("Opened region " + regionInfo.getRegionNameAsString() +
|
||||
" on " + serverInfo.getServerName());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -425,6 +425,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
} catch (Throwable t) {
|
||||
// Call stop if error or process will stick around for ever since server
|
||||
// puts up non-daemon threads.
|
||||
LOG.error("Stopping HRS because failed initialize", t);
|
||||
this.server.stop();
|
||||
}
|
||||
}
|
||||
|
@ -812,6 +813,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
this.metrics = new RegionServerMetrics();
|
||||
startServiceThreads();
|
||||
LOG.info("Serving as " + this.serverInfo.getServerName() +
|
||||
", RPC listening on " + this.server.getListenerAddress() +
|
||||
", sessionid=0x" +
|
||||
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
|
||||
isOnline = true;
|
||||
|
|
|
@ -244,11 +244,22 @@ public class ZKAssign {
|
|||
int version = ZKUtil.checkExists(zkw, node);
|
||||
if(version == -1) {
|
||||
ZKUtil.createAndWatch(zkw, node, data.getBytes());
|
||||
return true;
|
||||
} else {
|
||||
return ZKUtil.setData(zkw, node, data.getBytes(), version);
|
||||
if (!ZKUtil.setData(zkw, node, data.getBytes(), version)) {
|
||||
return false;
|
||||
} else {
|
||||
// We successfully forced to OFFLINE, reset watch and handle if
|
||||
// the state changed in between our set and the watch
|
||||
RegionTransitionData curData =
|
||||
ZKAssign.getData(zkw, region.getEncodedName());
|
||||
if (curData.getEventType() != data.getEventType()) {
|
||||
// state changed, need to process
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -404,6 +415,8 @@ public class ZKAssign {
|
|||
"after verifying it was in OPENED state, we got a version mismatch"));
|
||||
return false;
|
||||
}
|
||||
LOG.debug(zkw.prefix("Successfully deleted unassigned node for region " +
|
||||
regionName + " in expected state " + expectedState));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -745,6 +758,8 @@ public class ZKAssign {
|
|||
|
||||
/**
|
||||
* Blocks until there are no node in regions in transition.
|
||||
* <p>
|
||||
* Used in testing only.
|
||||
* @param zkw zk reference
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
|
@ -759,7 +774,27 @@ public class ZKAssign {
|
|||
LOG.debug("ZK RIT -> " + znode);
|
||||
}
|
||||
}
|
||||
Thread.sleep(200);
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks until there is at least one node in regions in transition.
|
||||
* <p>
|
||||
* Used in testing only.
|
||||
* @param zkw zk reference
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public static void blockUntilRIT(ZooKeeperWatcher zkw)
|
||||
throws KeeperException, InterruptedException {
|
||||
while (!ZKUtil.nodeHasChildren(zkw, zkw.assignmentZNode)) {
|
||||
List<String> znodes =
|
||||
ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.assignmentZNode);
|
||||
if (znodes == null || znodes.isEmpty()) {
|
||||
LOG.debug("No RIT in ZK");
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1093,6 +1093,9 @@ public class ZKUtil {
|
|||
LOG.debug(zkw.prefix("Retrieved " + ((data == null)? 0: data.length) +
|
||||
" byte(s) of data from znode " + znode +
|
||||
(watcherSet? " and set watcher; ": "; data=") +
|
||||
(data == null? "null": StringUtils.abbreviate(Bytes.toString(data), 32))));
|
||||
(data == null? "null": (
|
||||
znode.startsWith(zkw.assignmentZNode) ?
|
||||
RegionTransitionData.fromBytes(data).toString()
|
||||
: StringUtils.abbreviate(Bytes.toString(data), 32)))));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -715,6 +715,30 @@ public class HBaseTestingUtility {
|
|||
return createMultiRegions(c, table, columnFamily, KEYS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the specified number of regions in the specified table.
|
||||
* @param c
|
||||
* @param table
|
||||
* @param columnFamily
|
||||
* @param startKeys
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public int createMultiRegions(final Configuration c, final HTable table,
|
||||
final byte [] family, int numRegions)
|
||||
throws IOException {
|
||||
if (numRegions < 3) throw new IOException("Must create at least 3 regions");
|
||||
byte [] startKey = Bytes.toBytes("aaaaa");
|
||||
byte [] endKey = Bytes.toBytes("zzzzz");
|
||||
byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
|
||||
byte [][] regionStartKeys = new byte[splitKeys.length+1][];
|
||||
for (int i=0;i<splitKeys.length;i++) {
|
||||
regionStartKeys[i+1] = splitKeys[i];
|
||||
}
|
||||
regionStartKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
|
||||
return createMultiRegions(c, table, family, regionStartKeys);
|
||||
}
|
||||
|
||||
public int createMultiRegions(final Configuration c, final HTable table,
|
||||
final byte[] columnFamily, byte [][] startKeys)
|
||||
throws IOException {
|
||||
|
|
|
@ -0,0 +1,353 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests the restarting of everything as done during rolling restarts.
|
||||
*/
|
||||
public class TestRollingRestart {
|
||||
private static final Log LOG = LogFactory.getLog(TestRollingRestart.class);
|
||||
|
||||
@Test
|
||||
public void testBasicRollingRestart() throws Exception {
|
||||
|
||||
// Start a cluster with 2 masters and 4 regionservers
|
||||
final int NUM_MASTERS = 2;
|
||||
final int NUM_RS = 3;
|
||||
final int NUM_REGIONS_TO_CREATE = 27;
|
||||
|
||||
int expectedNumRS = 3;
|
||||
|
||||
// Start the cluster
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
cluster.waitForActiveAndReadyMaster();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testRollingRestart",
|
||||
null);
|
||||
|
||||
// Create a table with regions
|
||||
byte [] table = Bytes.toBytes("tableRestart");
|
||||
byte [] family = Bytes.toBytes("family");
|
||||
HTable ht = TEST_UTIL.createTable(table, family);
|
||||
int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family,
|
||||
NUM_REGIONS_TO_CREATE);
|
||||
numRegions += 2; // catalogs
|
||||
LOG.debug("\n\nWaiting for no more RIT\n");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
LOG.debug("\n\nDisabling table\n");
|
||||
TEST_UTIL.getHBaseAdmin().disableTable(table);
|
||||
LOG.debug("\n\nWaiting for no more RIT\n");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
LOG.debug("\n\nEnabling table\n");
|
||||
TEST_UTIL.getHBaseAdmin().enableTable(table);
|
||||
LOG.debug("\n\nWaiting for no more RIT\n");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
LOG.debug("\n\nVerifying there are " + numRegions + " assigned on cluster\n");
|
||||
NavigableSet<String> regions = getAllOnlineRegions(cluster);
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
|
||||
|
||||
// Add a new regionserver
|
||||
log("Adding a fourth RS");
|
||||
RegionServerThread restarted = cluster.startRegionServer();
|
||||
expectedNumRS++;
|
||||
restarted.waitForServerOnline();
|
||||
log("Additional RS is online");
|
||||
log("Waiting for no more RIT");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
log("Verifying there are " + numRegions + " assigned on cluster");
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
|
||||
|
||||
// Master Restarts
|
||||
List<MasterThread> masterThreads = cluster.getMasterThreads();
|
||||
MasterThread activeMaster = null;
|
||||
MasterThread backupMaster = null;
|
||||
assertEquals(2, masterThreads.size());
|
||||
if (masterThreads.get(0).getMaster().isActiveMaster()) {
|
||||
activeMaster = masterThreads.get(0);
|
||||
backupMaster = masterThreads.get(1);
|
||||
} else {
|
||||
activeMaster = masterThreads.get(1);
|
||||
backupMaster = masterThreads.get(0);
|
||||
}
|
||||
|
||||
// Bring down the backup master
|
||||
LOG.debug("\n\nStopping backup master\n\n");
|
||||
backupMaster.getMaster().stop("Stop of backup during rolling restart");
|
||||
cluster.hbaseCluster.waitOnMaster(backupMaster);
|
||||
|
||||
// Bring down the primary master
|
||||
LOG.debug("\n\nStopping primary master\n\n");
|
||||
activeMaster.getMaster().stop("Stop of active during rolling restart");
|
||||
cluster.hbaseCluster.waitOnMaster(activeMaster);
|
||||
|
||||
// Start primary master
|
||||
LOG.debug("\n\nRestarting primary master\n\n");
|
||||
activeMaster = cluster.startMaster();
|
||||
cluster.waitForActiveAndReadyMaster();
|
||||
|
||||
// Start backup master
|
||||
LOG.debug("\n\nRestarting backup master\n\n");
|
||||
backupMaster = cluster.startMaster();
|
||||
|
||||
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
|
||||
|
||||
// RegionServer Restarts
|
||||
|
||||
// Bring them down, one at a time, waiting between each to complete
|
||||
List<RegionServerThread> regionServers =
|
||||
cluster.getLiveRegionServerThreads();
|
||||
int num = 1;
|
||||
int total = regionServers.size();
|
||||
for (RegionServerThread rst : regionServers) {
|
||||
String serverName = rst.getRegionServer().getServerName();
|
||||
log("Stopping region server " + num + " of " + total + " [ " +
|
||||
serverName + "]");
|
||||
rst.getRegionServer().stop("Stopping RS during rolling restart");
|
||||
cluster.hbaseCluster.waitOnRegionServer(rst);
|
||||
log("Waiting for RS shutdown to be handled by master");
|
||||
waitForRSShutdownToStartAndFinish(activeMaster, serverName);
|
||||
log("RS shutdown done, waiting for no more RIT");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
log("Verifying there are " + numRegions + " assigned on cluster");
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
expectedNumRS--;
|
||||
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
|
||||
log("Restarting region server " + num + " of " + total);
|
||||
restarted = cluster.startRegionServer();
|
||||
restarted.waitForServerOnline();
|
||||
expectedNumRS++;
|
||||
log("Region server " + num + " is back online");
|
||||
log("Waiting for no more RIT");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
log("Verifying there are " + numRegions + " assigned on cluster");
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
|
||||
num++;
|
||||
}
|
||||
Thread.sleep(2000);
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
|
||||
// Bring the RS hosting ROOT down and the RS hosting META down at once
|
||||
RegionServerThread rootServer = getServerHostingRoot(cluster);
|
||||
RegionServerThread metaServer = getServerHostingMeta(cluster);
|
||||
if (rootServer == metaServer) {
|
||||
log("ROOT and META on the same server so killing another random server");
|
||||
int i=0;
|
||||
while (rootServer == metaServer) {
|
||||
metaServer = cluster.getRegionServerThreads().get(i);
|
||||
i++;
|
||||
}
|
||||
}
|
||||
log("Stopping server hosting ROOT");
|
||||
rootServer.getRegionServer().stop("Stopping ROOT server");
|
||||
log("Stopping server hosting META #1");
|
||||
metaServer.getRegionServer().stop("Stopping META server");
|
||||
cluster.hbaseCluster.waitOnRegionServer(rootServer);
|
||||
log("Root server down");
|
||||
cluster.hbaseCluster.waitOnRegionServer(metaServer);
|
||||
log("Meta server down #1");
|
||||
expectedNumRS -= 2;
|
||||
log("Waiting for meta server #1 RS shutdown to be handled by master");
|
||||
waitForRSShutdownToStartAndFinish(activeMaster,
|
||||
metaServer.getRegionServer().getServerName());
|
||||
log("Waiting for no more RIT");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
log("Verifying there are " + numRegions + " assigned on cluster");
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
|
||||
|
||||
// Kill off the server hosting META again
|
||||
metaServer = getServerHostingMeta(cluster);
|
||||
log("Stopping server hosting META #2");
|
||||
metaServer.getRegionServer().stop("Stopping META server");
|
||||
cluster.hbaseCluster.waitOnRegionServer(metaServer);
|
||||
log("Meta server down");
|
||||
expectedNumRS--;
|
||||
log("Waiting for RS shutdown to be handled by master");
|
||||
waitForRSShutdownToStartAndFinish(activeMaster,
|
||||
metaServer.getRegionServer().getServerName());
|
||||
log("RS shutdown done, waiting for no more RIT");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
log("Verifying there are " + numRegions + " assigned on cluster");
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
|
||||
|
||||
// Start 3 RS again
|
||||
cluster.startRegionServer().waitForServerOnline();
|
||||
cluster.startRegionServer().waitForServerOnline();
|
||||
cluster.startRegionServer().waitForServerOnline();
|
||||
Thread.sleep(1000);
|
||||
log("Waiting for no more RIT");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
log("Verifying there are " + numRegions + " assigned on cluster");
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
// Shutdown server hosting META
|
||||
metaServer = getServerHostingMeta(cluster);
|
||||
log("Stopping server hosting META (1 of 3)");
|
||||
metaServer.getRegionServer().stop("Stopping META server");
|
||||
cluster.hbaseCluster.waitOnRegionServer(metaServer);
|
||||
log("Meta server down (1 of 3)");
|
||||
log("Waiting for RS shutdown to be handled by master");
|
||||
waitForRSShutdownToStartAndFinish(activeMaster,
|
||||
metaServer.getRegionServer().getServerName());
|
||||
log("RS shutdown done, waiting for no more RIT");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
log("Verifying there are " + numRegions + " assigned on cluster");
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
|
||||
// Shutdown server hosting META again
|
||||
metaServer = getServerHostingMeta(cluster);
|
||||
log("Stopping server hosting META (2 of 3)");
|
||||
metaServer.getRegionServer().stop("Stopping META server");
|
||||
cluster.hbaseCluster.waitOnRegionServer(metaServer);
|
||||
log("Meta server down (2 of 3)");
|
||||
log("Waiting for RS shutdown to be handled by master");
|
||||
waitForRSShutdownToStartAndFinish(activeMaster,
|
||||
metaServer.getRegionServer().getServerName());
|
||||
log("RS shutdown done, waiting for no more RIT");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
log("Verifying there are " + numRegions + " assigned on cluster");
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
|
||||
// Shutdown server hosting META again
|
||||
metaServer = getServerHostingMeta(cluster);
|
||||
log("Stopping server hosting META (3 of 3)");
|
||||
metaServer.getRegionServer().stop("Stopping META server");
|
||||
cluster.hbaseCluster.waitOnRegionServer(metaServer);
|
||||
log("Meta server down (3 of 3)");
|
||||
log("Waiting for RS shutdown to be handled by master");
|
||||
waitForRSShutdownToStartAndFinish(activeMaster,
|
||||
metaServer.getRegionServer().getServerName());
|
||||
log("RS shutdown done, waiting for no more RIT");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
log("Verifying there are " + numRegions + " assigned on cluster");
|
||||
assertRegionsAssigned(cluster, regions);
|
||||
|
||||
if (cluster.getRegionServerThreads().size() != 1) {
|
||||
log("Online regionservers:");
|
||||
for (RegionServerThread rst : cluster.getRegionServerThreads()) {
|
||||
log("RS: " + rst.getRegionServer().getServerName());
|
||||
}
|
||||
}
|
||||
assertEquals(1, cluster.getRegionServerThreads().size());
|
||||
|
||||
|
||||
// TODO: Bring random 3 of 4 RS down at the same time
|
||||
|
||||
|
||||
// Stop the cluster
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster,
|
||||
String serverName) throws InterruptedException {
|
||||
ServerManager sm = activeMaster.getMaster().getServerManager();
|
||||
// First wait for it to be in dead list
|
||||
while (!sm.deadservers.isDeadServer(serverName)) {
|
||||
log("Waiting for [" + serverName + "] to be listed as dead in master");
|
||||
Thread.sleep(100);
|
||||
}
|
||||
log("Server [" + serverName + "] marked as dead, waiting for it to " +
|
||||
"finish dead processing");
|
||||
while (sm.deadservers.isDeadServer(serverName)) {
|
||||
log("Server [" + serverName + "] still marked as dead, waiting");
|
||||
Thread.sleep(100);
|
||||
}
|
||||
log("Server [" + serverName + "] done with server shutdown processing");
|
||||
}
|
||||
|
||||
private void log(String msg) {
|
||||
LOG.debug("\n\n" + msg + "\n");
|
||||
}
|
||||
|
||||
private RegionServerThread getServerHostingMeta(MiniHBaseCluster cluster) {
|
||||
return getServerHosting(cluster, HRegionInfo.FIRST_META_REGIONINFO);
|
||||
}
|
||||
|
||||
private RegionServerThread getServerHostingRoot(MiniHBaseCluster cluster) {
|
||||
return getServerHosting(cluster, HRegionInfo.ROOT_REGIONINFO);
|
||||
}
|
||||
|
||||
private RegionServerThread getServerHosting(MiniHBaseCluster cluster,
|
||||
HRegionInfo region) {
|
||||
for (RegionServerThread rst : cluster.getRegionServerThreads()) {
|
||||
if (rst.getRegionServer().getOnlineRegions().contains(region)) {
|
||||
return rst;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void assertRegionsAssigned(MiniHBaseCluster cluster,
|
||||
Set<String> expectedRegions) {
|
||||
int numFound = 0;
|
||||
for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
|
||||
numFound += rst.getRegionServer().getNumberOfOnlineRegions();
|
||||
}
|
||||
if (expectedRegions.size() != numFound) {
|
||||
LOG.debug("Expected to find " + expectedRegions.size() + " but only found"
|
||||
+ " " + numFound);
|
||||
NavigableSet<String> foundRegions = getAllOnlineRegions(cluster);
|
||||
for (String region : expectedRegions) {
|
||||
if (!foundRegions.contains(region)) {
|
||||
LOG.debug("Missing region: " + region);
|
||||
}
|
||||
}
|
||||
assertEquals(expectedRegions.size(), numFound);
|
||||
} else {
|
||||
log("Success! Found expected number of " + numFound + " regions");
|
||||
}
|
||||
}
|
||||
|
||||
private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster) {
|
||||
NavigableSet<String> online = new TreeSet<String>();
|
||||
for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
|
||||
for (HRegionInfo region : rst.getRegionServer().getOnlineRegions()) {
|
||||
online.add(region.getRegionNameAsString());
|
||||
}
|
||||
}
|
||||
return online;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue