HBASE-1311 ZooKeeperWrapper: Failed to set watcher on ZNode /hbase/master
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@773677 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3867c4c9e4
commit
135daf0c41
|
@ -121,6 +121,8 @@ Release 0.20.0 - Unreleased
|
|||
(Ryan Rawson via Stack)
|
||||
HBASE-1399 Can't drop tables since HBASE-1398 (Ryan Rawson via Andrew
|
||||
Purtell)
|
||||
HBASE-1311 ZooKeeperWrapper: Failed to set watcher on ZNode /hbase/master
|
||||
(Nitay Joffe via Stack)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||
|
|
|
@ -521,7 +521,19 @@ class RegionManager implements HConstants {
|
|||
return Collections.unmodifiableMap(onlineMetaRegions);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public boolean metaRegionsInTransition() {
|
||||
synchronized (onlineMetaRegions) {
|
||||
for (MetaRegion metaRegion : onlineMetaRegions.values()) {
|
||||
String regionName = Bytes.toString(metaRegion.getRegionName());
|
||||
if (regionIsInTransition(regionName)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the root and meta scanners so that the region servers serving meta
|
||||
* regions can shut down.
|
||||
|
|
|
@ -69,8 +69,9 @@ abstract class RegionServerOperation implements Delayed, HConstants {
|
|||
|
||||
protected boolean metaTableAvailable() {
|
||||
boolean available = true;
|
||||
if (master.regionManager.numMetaRegions() !=
|
||||
master.regionManager.numOnlineMetaRegions()) {
|
||||
if ((master.regionManager.numMetaRegions() !=
|
||||
master.regionManager.numOnlineMetaRegions()) ||
|
||||
master.regionManager.metaRegionsInTransition()) {
|
||||
// We can't proceed because not all of the meta regions are online.
|
||||
// We can't block either because that would prevent the meta region
|
||||
// online message from being processed. In order to prevent spinning
|
||||
|
|
|
@ -107,6 +107,7 @@ import org.apache.hadoop.util.StringUtils;
|
|||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.Watcher.Event.EventType;
|
||||
import org.apache.zookeeper.Watcher.Event.KeeperState;
|
||||
|
||||
/**
|
||||
* HRegionServer makes a set of HRegions available to clients. It checks in with
|
||||
|
@ -136,7 +137,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
// If false, the file system has become unavailable
|
||||
protected volatile boolean fsOk;
|
||||
|
||||
protected final HServerInfo serverInfo;
|
||||
protected HServerInfo serverInfo;
|
||||
protected final HBaseConfiguration conf;
|
||||
|
||||
private final ServerConnection connection;
|
||||
|
@ -167,10 +168,10 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
|
||||
// Server to handle client requests. Default access so can be accessed by
|
||||
// unit tests.
|
||||
final HBaseServer server;
|
||||
HBaseServer server;
|
||||
|
||||
// Leases
|
||||
private final Leases leases;
|
||||
private Leases leases;
|
||||
|
||||
// Request counter
|
||||
private volatile AtomicInteger requestCount = new AtomicInteger();
|
||||
|
@ -193,20 +194,20 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
private RegionServerMetrics metrics;
|
||||
|
||||
// Compactions
|
||||
final CompactSplitThread compactSplitThread;
|
||||
CompactSplitThread compactSplitThread;
|
||||
|
||||
// Cache flushing
|
||||
final MemcacheFlusher cacheFlusher;
|
||||
MemcacheFlusher cacheFlusher;
|
||||
|
||||
/* Check for major compactions.
|
||||
*/
|
||||
final Chore majorCompactionChecker;
|
||||
Chore majorCompactionChecker;
|
||||
|
||||
// HLog and HLog roller. log is protected rather than private to avoid
|
||||
// eclipse warning when accessed by inner classes
|
||||
protected volatile HLog log;
|
||||
final LogRoller logRoller;
|
||||
final LogFlusher logFlusher;
|
||||
LogRoller logRoller;
|
||||
LogFlusher logFlusher;
|
||||
|
||||
// limit compactions while starting up
|
||||
CompactionLimitThread compactionLimitThread;
|
||||
|
@ -217,13 +218,23 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
final Map<String, InternalScanner> scanners =
|
||||
new ConcurrentHashMap<String, InternalScanner>();
|
||||
|
||||
private final ZooKeeperWrapper zooKeeperWrapper;
|
||||
private ZooKeeperWrapper zooKeeperWrapper;
|
||||
|
||||
// A sleeper that sleeps for msgInterval.
|
||||
private final Sleeper sleeper;
|
||||
|
||||
private final long rpcTimeout;
|
||||
|
||||
// Address passed in to constructor.
|
||||
private final HServerAddress address;
|
||||
|
||||
// The main region server thread.
|
||||
private Thread regionServerThread;
|
||||
|
||||
// Run HDFS shutdown thread on exit if this is set. We clear this out when
|
||||
// doing a restart() to prevent closing of HDFS.
|
||||
private final AtomicBoolean shutdownHDFS = new AtomicBoolean(true);
|
||||
|
||||
/**
|
||||
* Starts a HRegionServer at the default location
|
||||
* @param conf
|
||||
|
@ -241,7 +252,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
* @throws IOException
|
||||
*/
|
||||
public HRegionServer(HServerAddress address, HBaseConfiguration conf)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
this.address = address;
|
||||
this.abortRequested = false;
|
||||
this.fsOk = true;
|
||||
this.conf = conf;
|
||||
|
@ -258,6 +270,73 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
|
||||
sleeper = new Sleeper(this.msgInterval, this.stopRequested);
|
||||
|
||||
// Task thread to process requests from Master
|
||||
this.worker = new Worker();
|
||||
|
||||
this.numRegionsToReport =
|
||||
conf.getInt("hbase.regionserver.numregionstoreport", 10);
|
||||
|
||||
this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
|
||||
|
||||
reinitialize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates all of the state that needs to be reconstructed in case we are
|
||||
* doing a restart. This is shared between the constructor and restart().
|
||||
* @throws IOException
|
||||
*/
|
||||
private void reinitialize() throws IOException {
|
||||
abortRequested = false;
|
||||
stopRequested.set(false);
|
||||
shutdownHDFS.set(true);
|
||||
|
||||
// Server to handle client requests
|
||||
this.server = HBaseRPC.getServer(this, address.getBindAddress(),
|
||||
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
||||
false, conf);
|
||||
this.server.setErrorHandler(this);
|
||||
String machineName = DNS.getDefaultHost(
|
||||
conf.get("hbase.regionserver.dns.interface","default"),
|
||||
conf.get("hbase.regionserver.dns.nameserver","default"));
|
||||
// Address is givin a default IP for the moment. Will be changed after
|
||||
// calling the master.
|
||||
this.serverInfo = new HServerInfo(new HServerAddress(
|
||||
new InetSocketAddress(address.getBindAddress(),
|
||||
this.server.getListenerAddress().getPort())), System.currentTimeMillis(),
|
||||
this.conf.getInt("hbase.regionserver.info.port", 60030), machineName);
|
||||
if (this.serverInfo.getServerAddress() == null) {
|
||||
throw new NullPointerException("Server address cannot be null; " +
|
||||
"hbase-958 debugging");
|
||||
}
|
||||
|
||||
reinitializeThreads();
|
||||
|
||||
reinitializeZooKeeper();
|
||||
|
||||
int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
|
||||
for(int i = 0; i < nbBlocks; i++) {
|
||||
reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
|
||||
}
|
||||
}
|
||||
|
||||
private void reinitializeZooKeeper() throws IOException {
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf);
|
||||
watchMasterAddress();
|
||||
|
||||
boolean startCodeOk = false;
|
||||
while(!startCodeOk) {
|
||||
serverInfo.setStartCode(System.currentTimeMillis());
|
||||
startCodeOk = zooKeeperWrapper.writeRSLocation(serverInfo);
|
||||
if(!startCodeOk) {
|
||||
LOG.debug("Start code already taken, trying another one");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void reinitializeThreads() {
|
||||
this.workerThread = new Thread(worker);
|
||||
|
||||
// Cache flushing thread.
|
||||
this.cacheFlusher = new MemcacheFlusher(conf, this);
|
||||
|
||||
|
@ -278,53 +357,9 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
this.majorCompactionChecker = new MajorCompactionChecker(this,
|
||||
this.threadWakeFrequency * multiplier, this.stopRequested);
|
||||
|
||||
// Task thread to process requests from Master
|
||||
this.worker = new Worker();
|
||||
this.workerThread = new Thread(worker);
|
||||
|
||||
// Server to handle client requests
|
||||
this.server = HBaseRPC.getServer(this, address.getBindAddress(),
|
||||
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
||||
false, conf);
|
||||
this.server.setErrorHandler(this);
|
||||
String machineName = DNS.getDefaultHost(
|
||||
conf.get("hbase.regionserver.dns.interface","default"),
|
||||
conf.get("hbase.regionserver.dns.nameserver","default"));
|
||||
// Address is givin a default IP for the moment. Will be changed after
|
||||
// calling the master.
|
||||
this.serverInfo = new HServerInfo(new HServerAddress(
|
||||
new InetSocketAddress(address.getBindAddress(),
|
||||
this.server.getListenerAddress().getPort())), System.currentTimeMillis(),
|
||||
this.conf.getInt("hbase.regionserver.info.port", 60030), machineName);
|
||||
|
||||
if (this.serverInfo.getServerAddress() == null) {
|
||||
throw new NullPointerException("Server address cannot be null; " +
|
||||
"hbase-958 debugging");
|
||||
}
|
||||
this.zooKeeperWrapper = new ZooKeeperWrapper(conf);
|
||||
watchMasterAddress();
|
||||
|
||||
boolean startCodeOk = false;
|
||||
while(!startCodeOk) {
|
||||
serverInfo.setStartCode(System.currentTimeMillis());
|
||||
startCodeOk = zooKeeperWrapper.writeRSLocation(serverInfo);
|
||||
if(!startCodeOk) {
|
||||
LOG.debug("Start code already taken, trying another one");
|
||||
}
|
||||
}
|
||||
|
||||
this.numRegionsToReport =
|
||||
conf.getInt("hbase.regionserver.numregionstoreport", 10);
|
||||
|
||||
this.leases = new Leases(
|
||||
conf.getInt("hbase.regionserver.lease.period", 60 * 1000),
|
||||
this.threadWakeFrequency);
|
||||
|
||||
int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
|
||||
for(int i = 0; i < nbBlocks; i++) {
|
||||
reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
|
||||
}
|
||||
this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
|
||||
conf.getInt("hbase.regionserver.lease.period", 60 * 1000),
|
||||
this.threadWakeFrequency);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -336,14 +371,25 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
*/
|
||||
public void process(WatchedEvent event) {
|
||||
EventType type = event.getType();
|
||||
LOG.info("Got ZooKeeper event, state: " + event.getState() + ", type: " +
|
||||
KeeperState state = event.getState();
|
||||
LOG.info("Got ZooKeeper event, state: " + state + ", type: " +
|
||||
type + ", path: " + event.getPath());
|
||||
if (type == EventType.NodeCreated) {
|
||||
getMaster();
|
||||
|
||||
// Ignore events if we're shutting down.
|
||||
if (stopRequested.get()) {
|
||||
LOG.debug("Ignoring ZooKeeper event while shutting down");
|
||||
return;
|
||||
}
|
||||
|
||||
// ZooKeeper watches are one time only, so we need to re-register our watch.
|
||||
watchMasterAddress();
|
||||
if (state == KeeperState.Expired) {
|
||||
LOG.error("ZooKeeper session expired");
|
||||
restart();
|
||||
} else if (type == EventType.NodeCreated) {
|
||||
getMaster();
|
||||
|
||||
// ZooKeeper watches are one time only, so we need to re-register our watch.
|
||||
watchMasterAddress();
|
||||
}
|
||||
}
|
||||
|
||||
private void watchMasterAddress() {
|
||||
|
@ -353,12 +399,41 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
}
|
||||
}
|
||||
|
||||
private void restart() {
|
||||
LOG.info("Restarting Region Server");
|
||||
|
||||
shutdownHDFS.set(false);
|
||||
abort();
|
||||
Threads.shutdown(regionServerThread);
|
||||
|
||||
boolean done = false;
|
||||
while (!done) {
|
||||
try {
|
||||
reinitialize();
|
||||
done = true;
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Error trying to reinitialize ZooKeeper", e);
|
||||
}
|
||||
}
|
||||
|
||||
Thread t = new Thread(this);
|
||||
String name = regionServerThread.getName();
|
||||
t.setName(name);
|
||||
t.start();
|
||||
}
|
||||
|
||||
/** @return ZooKeeperWrapper used by RegionServer. */
|
||||
public ZooKeeperWrapper getZooKeeperWrapper() {
|
||||
return zooKeeperWrapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* The HRegionServer sticks in this loop until closed. It repeatedly checks
|
||||
* in with the HMaster, sending heartbeats & reports, and receiving HRegion
|
||||
* load/unload instructions.
|
||||
*/
|
||||
public void run() {
|
||||
regionServerThread = Thread.currentThread();
|
||||
boolean quiesceRequested = false;
|
||||
try {
|
||||
init(reportForDuty());
|
||||
|
@ -600,8 +675,11 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
}
|
||||
join();
|
||||
|
||||
runThread(this.hdfsShutdownThread,
|
||||
this.conf.getLong("hbase.dfs.shutdown.wait", 30000));
|
||||
if (shutdownHDFS.get()) {
|
||||
runThread(this.hdfsShutdownThread,
|
||||
this.conf.getLong("hbase.dfs.shutdown.wait", 30000));
|
||||
}
|
||||
|
||||
LOG.info(Thread.currentThread().getName() + " exiting");
|
||||
}
|
||||
|
||||
|
|
|
@ -110,8 +110,7 @@ public class MiniHBaseCluster implements HConstants {
|
|||
* @param serverNumber Used as index into a list.
|
||||
*/
|
||||
public void abortRegionServer(int serverNumber) {
|
||||
HRegionServer server =
|
||||
this.hbaseCluster.getRegionServers().get(serverNumber).getRegionServer();
|
||||
HRegionServer server = getRegionServer(serverNumber);
|
||||
LOG.info("Aborting " + server.getServerInfo().toString());
|
||||
server.abort();
|
||||
}
|
||||
|
|
|
@ -21,10 +21,14 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
|
@ -106,4 +110,43 @@ public class TestZooKeeper extends HBaseClusterTestCase {
|
|||
System.err.println("ZooKeeper should have timed out");
|
||||
connection.relocateRegion(HConstants.ROOT_TABLE_NAME, HConstants.EMPTY_BYTE_ARRAY);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public void testRegionServerSessionExpired() {
|
||||
try {
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
|
||||
String quorumServers = zooKeeperCluster.getQuorumServers();
|
||||
int sessionTimeout = conf.getInt("zookeeper.session.timeout", 2 * 1000);
|
||||
|
||||
Watcher watcher = new EmptyWatcher();
|
||||
HRegionServer rs = cluster.getRegionServer(0);
|
||||
ZooKeeperWrapper rsZK = rs.getZooKeeperWrapper();
|
||||
long sessionID = rsZK.getSessionID();
|
||||
byte[] password = rsZK.getSessionPassword();
|
||||
|
||||
ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, sessionID, password);
|
||||
zk.close();
|
||||
|
||||
Thread.sleep(sessionTimeout * 3);
|
||||
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
|
||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||
HTableDescriptor desc = new HTableDescriptor("test");
|
||||
HColumnDescriptor family = new HColumnDescriptor("fam:");
|
||||
desc.addFamily(family);
|
||||
admin.createTable(desc);
|
||||
|
||||
HTable table = new HTable("test");
|
||||
BatchUpdate batchUpdate = new BatchUpdate("testrow");
|
||||
batchUpdate.put("fam:col", Bytes.toBytes("testdata"));
|
||||
table.commit(batchUpdate);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue