From 196882e2cd68962d64e0a009a6f28496379f55cb Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Mon, 3 May 2010 19:53:39 +0000 Subject: [PATCH] HBASE-2482 regions in transition do not get reassigned by master when RS crashes git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@940589 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 +- .../java/org/apache/hadoop/hbase/HMsg.java | 9 +- .../hadoop/hbase/LocalHBaseCluster.java | 16 +- .../apache/hadoop/hbase/master/HMaster.java | 12 +- .../hbase/master/ProcessServerShutdown.java | 23 +- .../hadoop/hbase/master/RegionManager.java | 148 +++++----- .../master/RegionServerOperationListener.java | 15 + .../master/RegionServerOperationQueue.java | 20 ++ .../hadoop/hbase/master/ServerManager.java | 26 +- .../hbase/regionserver/HRegionServer.java | 39 ++- .../apache/hadoop/hbase/MiniHBaseCluster.java | 35 ++- .../hbase/master/TestMasterTransistions.java | 256 +++++++++++++++++- 12 files changed, 484 insertions(+), 119 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 78845d66f33..6e45029f68e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -300,8 +300,8 @@ Release 0.21.0 - Unreleased HBASE-2499 Race condition when disabling a table leaves regions in transition HBASE-2489 Make the "Filesystem needs to be upgraded" error message more useful (Benoit Sigoure via Stack) - - + HBASE-2482 regions in transition do not get reassigned by master when RS + crashes (Todd Lipcon via Stack) IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/core/src/main/java/org/apache/hadoop/hbase/HMsg.java b/core/src/main/java/org/apache/hadoop/hbase/HMsg.java index 852140bedba..7905b65e74f 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/HMsg.java +++ b/core/src/main/java/org/apache/hadoop/hbase/HMsg.java @@ -1,5 +1,5 @@ /** - * Copyright 2007 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -125,6 +125,13 @@ public class HMsg implements Writable { * rather than send them individually in MSG_REPORT_OPEN messages. */ MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, + + /** + * When RegionServer receives this message, it goes into a sleep that only + * an exit will cure. This message is sent by unit tests simulating + * pathological states. + */ + TESTING_MSG_BLOCK_RS, } private Type type = null; diff --git a/core/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/core/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 801d138f8d1..767bc9907a8 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/core/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -82,10 +82,15 @@ public class LocalHBaseCluster implements HConstants { * @param noRegionServers Count of regionservers to start. * @throws IOException */ - public LocalHBaseCluster(final Configuration conf, - final int noRegionServers) + public LocalHBaseCluster(final Configuration conf, final int noRegionServers) throws IOException { - this(conf, noRegionServers, HMaster.class); + this(conf, noRegionServers, HMaster.class, getRegionServerImplementation(conf)); + } + + @SuppressWarnings("unchecked") + private static Class getRegionServerImplementation(final Configuration conf) { + return (Class)conf.getClass(HConstants.REGION_SERVER_IMPL, + HRegionServer.class); } /** @@ -98,7 +103,8 @@ public class LocalHBaseCluster implements HConstants { */ @SuppressWarnings("unchecked") public LocalHBaseCluster(final Configuration conf, - final int noRegionServers, final Class masterClass) + final int noRegionServers, final Class masterClass, + final Class regionServerClass) throws IOException { this.conf = conf; // Create the master @@ -111,7 +117,7 @@ public class LocalHBaseCluster implements HConstants { new CopyOnWriteArrayList(); this.regionServerClass = (Class)conf.getClass(HConstants.REGION_SERVER_IMPL, - HRegionServer.class); + regionServerClass); for (int i = 0; i < noRegionServers; i++) { addRegionServer(i); } diff --git a/core/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/core/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c40ecdda5cc..0103a0ca512 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/core/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -90,6 +90,8 @@ import java.util.NavigableMap; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * HMaster is the "master server" for HBase. An HBase cluster has one active @@ -124,6 +126,9 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, // Metrics is set when we call run. private final MasterMetrics metrics; + + final Lock splitLogLock = new ReentrantLock(); + // Our zk client. private ZooKeeperWrapper zooKeeperWrapper; // Watcher for master address and for cluster shutdown. @@ -561,7 +566,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, if(this.serverManager.getServerInfo(serverName) == null) { LOG.info("Log folder doesn't belong " + "to a known region server, splitting"); - this.regionManager.splitLogLock.lock(); + this.splitLogLock.lock(); Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName)); try { @@ -569,7 +574,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, } catch (IOException e) { LOG.error("Failed splitting " + logDir.toString(), e); } finally { - this.regionManager.splitLogLock.unlock(); + this.splitLogLock.unlock(); } } else { LOG.info("Log folder belongs to an existing region server"); @@ -1127,7 +1132,8 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, return c.newInstance(conf); } catch (Exception e) { throw new RuntimeException("Failed construction of " + - "Master: " + masterClass.toString(), e); + "Master: " + masterClass.toString() + + ((e.getCause() != null)? e.getCause().getMessage(): ""), e); } } diff --git a/core/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java b/core/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java index af7daadfd10..ff926ee0213 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java +++ b/core/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java @@ -30,11 +30,13 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.master.RegionManager.RegionState; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -78,6 +80,7 @@ class ProcessServerShutdown extends RegionServerOperation { // check to see if I am responsible for either ROOT or any of the META tables. + // TODO Why do we do this now instead of at processing time? closeMetaRegions(); } @@ -116,6 +119,19 @@ class ProcessServerShutdown extends RegionServerOperation { return this.deadServerAddress; } + private void closeRegionsInTransition() { + Map inTransition = + master.getRegionManager().getRegionsInTransitionOnServer(deadServer); + for (Map.Entry entry : inTransition.entrySet()) { + String regionName = entry.getKey(); + RegionState state = entry.getValue(); + + LOG.info("Region " + regionName + " was in transition " + + state + " on dead server " + deadServer + " - marking unassigned"); + master.getRegionManager().setUnassigned(state.getRegionInfo(), true); + } + } + @Override public String toString() { return "ProcessServerShutdown of " + this.deadServer; @@ -282,7 +298,7 @@ class ProcessServerShutdown extends RegionServerOperation { if (!logSplit) { // Process the old log file if (this.master.getFileSystem().exists(rsLogDir)) { - if (!master.getRegionManager().splitLogLock.tryLock()) { + if (!master.splitLogLock.tryLock()) { return false; } try { @@ -290,7 +306,7 @@ class ProcessServerShutdown extends RegionServerOperation { this.master.getOldLogDir(), this.master.getFileSystem(), this.master.getConfiguration()); } finally { - master.getRegionManager().splitLogLock.unlock(); + master.splitLogLock.unlock(); } } logSplit = true; @@ -355,6 +371,9 @@ class ProcessServerShutdown extends RegionServerOperation { Bytes.toString(r.getRegionName()) + " on " + r.getServer()); } } + + closeRegionsInTransition(); + // Remove this server from dead servers list. Finished splitting logs. this.master.getServerManager().removeDeadServer(deadServer); if (LOG.isDebugEnabled()) { diff --git a/core/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java b/core/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java index d540dadfe3e..4fd53f6373e 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java +++ b/core/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.util.Writables; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -54,8 +55,6 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; /** * Class to manage assigning regions to servers, state of root and meta, etc. @@ -66,8 +65,6 @@ public class RegionManager implements HConstants { private AtomicReference rootRegionLocation = new AtomicReference(null); - final Lock splitLogLock = new ReentrantLock(); - private final RootScanner rootScannerThread; final MetaScanner metaScannerThread; @@ -166,8 +163,8 @@ public class RegionManager implements HConstants { unsetRootRegion(); if (!master.getShutdownRequested().get()) { synchronized (regionsInTransition) { - RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO); - s.setUnassigned(); + RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO, + RegionState.State.UNASSIGNED); regionsInTransition.put( HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(), s); LOG.info("ROOT inserted into regionsInTransition"); @@ -572,6 +569,23 @@ public class RegionManager implements HConstants { } return false; } + + /** + * Return a map of the regions in transition on a server. + * Returned map entries are region name -> RegionState + */ + Map getRegionsInTransitionOnServer(String serverName) { + Map ret = new HashMap(); + synchronized (regionsInTransition) { + for (Map.Entry entry : regionsInTransition.entrySet()) { + RegionState rs = entry.getValue(); + if (serverName.equals(rs.getServerName())) { + ret.put(entry.getKey(), rs); + } + } + } + return ret; + } /** * Stop the root and meta scanners so that the region servers serving meta @@ -824,6 +838,10 @@ public class RegionManager implements HConstants { && !s.isUnassigned() && s.getServerName() != null && s.getServerName().equals(server.toString())) { + // TODO this code appears to be entirely broken, since + // server.toString() has no start code, but s.getServerName() + // does! + LOG.fatal("I DONT BELIEVE YOU WILL EVER SEE THIS!"); // Has an outstanding meta region to be assigned. return true; } @@ -956,7 +974,7 @@ public class RegionManager implements HConstants { synchronized(this.regionsInTransition) { s = regionsInTransition.get(info.getRegionNameAsString()); if (s == null) { - s = new RegionState(info); + s = new RegionState(info, RegionState.State.UNASSIGNED); regionsInTransition.put(info.getRegionNameAsString(), s); } } @@ -1038,7 +1056,7 @@ public class RegionManager implements HConstants { RegionState s = this.regionsInTransition.get(regionInfo.getRegionNameAsString()); if (s == null) { - s = new RegionState(regionInfo); + s = new RegionState(regionInfo, RegionState.State.CLOSING); } // If region was asked to open before getting here, we could be taking // the wrong server name @@ -1474,22 +1492,30 @@ public class RegionManager implements HConstants { * note on regionsInTransition data member above for listing of state * transitions. */ - private static class RegionState implements Comparable { + static class RegionState implements Comparable { private final HRegionInfo regionInfo; - private volatile boolean unassigned = false; - private volatile boolean pendingOpen = false; - private volatile boolean open = false; - private volatile boolean closing = false; - private volatile boolean pendingClose = false; - private volatile boolean closed = false; - private volatile boolean offlined = false; + + enum State { + UNASSIGNED, // awaiting a server to be assigned + PENDING_OPEN, // told a server to open, hasn't opened yet + OPEN, // has been opened on RS, but not yet marked in META/ROOT + CLOSING, // a msg has been enqueued to close ths region, but not delivered to RS yet + PENDING_CLOSE, // msg has been delivered to RS to close this region + CLOSED // region has been closed but not yet marked in meta + + } + + private State state; + + private boolean isOfflined; /* Set when region is assigned or closing */ - private volatile String serverName = null; + private String serverName = null; /* Constructor */ - RegionState(HRegionInfo info) { + RegionState(HRegionInfo info, State state) { this.regionInfo = info; + this.state = state; } synchronized HRegionInfo getRegionInfo() { @@ -1511,14 +1537,16 @@ public class RegionManager implements HConstants { * @return true if the region is being opened */ synchronized boolean isOpening() { - return this.unassigned || this.pendingOpen || this.open; + return state == State.UNASSIGNED || + state == State.PENDING_OPEN || + state == State.OPEN; } /* * @return true if region is unassigned */ synchronized boolean isUnassigned() { - return unassigned; + return state == State.UNASSIGNED; } /* @@ -1527,120 +1555,84 @@ public class RegionManager implements HConstants { * called unless it is safe to do so. */ synchronized void setUnassigned() { - this.unassigned = true; - this.pendingOpen = false; - this.open = false; - this.closing = false; - this.pendingClose = false; - this.closed = false; - this.offlined = false; + state = State.UNASSIGNED; this.serverName = null; } synchronized boolean isPendingOpen() { - return pendingOpen; + return state == State.PENDING_OPEN; } /* * @param serverName Server region was assigned to. */ synchronized void setPendingOpen(final String serverName) { - if (!this.unassigned) { + if (state != State.UNASSIGNED) { LOG.warn("Cannot assign a region that is not currently unassigned. " + "FIX!! State: " + toString()); } - this.unassigned = false; - this.pendingOpen = true; - this.open = false; - this.closing = false; - this.pendingClose = false; - this.closed = false; - this.offlined = false; + state = State.PENDING_OPEN; this.serverName = serverName; } synchronized boolean isOpen() { - return open; + return state == State.OPEN; } synchronized void setOpen() { - if (!pendingOpen) { + if (state != State.PENDING_OPEN) { LOG.warn("Cannot set a region as open if it has not been pending. " + "FIX!! State: " + toString()); } - this.unassigned = false; - this.pendingOpen = false; - this.open = true; - this.closing = false; - this.pendingClose = false; - this.closed = false; - this.offlined = false; + state = State.OPEN; } synchronized boolean isClosing() { - return closing; + return state == State.CLOSING; } synchronized void setClosing(String serverName, boolean setOffline) { - this.unassigned = false; - this.pendingOpen = false; - this.open = false; - this.closing = true; - this.pendingClose = false; - this.closed = false; - this.offlined = setOffline; + state = State.CLOSING; this.serverName = serverName; + this.isOfflined = setOffline; } synchronized boolean isPendingClose() { - return this.pendingClose; + return state == State.PENDING_CLOSE; } synchronized void setPendingClose() { - if (!closing) { + if (state != State.CLOSING) { LOG.warn("Cannot set a region as pending close if it has not been " + "closing. FIX!! State: " + toString()); } - this.unassigned = false; - this.pendingOpen = false; - this.open = false; - this.closing = false; - this.pendingClose = true; - this.closed = false; + state = State.PENDING_CLOSE; } synchronized boolean isClosed() { - return this.closed; + return state == State.CLOSED; } synchronized void setClosed() { - if (!pendingClose && !pendingOpen && !closing) { + if (state != State.PENDING_CLOSE && + state != State.PENDING_OPEN && + state != State.CLOSING) { throw new IllegalStateException( "Cannot set a region to be closed if it was not already marked as" + - " pending close, pending open or closing. State: " + toString()); + " pending close, pending open or closing. State: " + this); } - this.unassigned = false; - this.pendingOpen = false; - this.open = false; - this.closing = false; - this.pendingClose = false; - this.closed = true; + state = State.CLOSED; } synchronized boolean isOfflined() { - return this.offlined; + return (state == State.CLOSING || + state == State.PENDING_CLOSE) && isOfflined; } @Override public synchronized String toString() { return ("name=" + Bytes.toString(getRegionName()) + - ", unassigned=" + this.unassigned + - ", pendingOpen=" + this.pendingOpen + - ", open=" + this.open + - ", closing=" + this.closing + - ", pendingClose=" + this.pendingClose + - ", closed=" + this.closed + - ", offlined=" + this.offlined); + ", state=" + this.state); } @Override diff --git a/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationListener.java b/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationListener.java index da5406523fb..d221110b17b 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationListener.java +++ b/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationListener.java @@ -21,12 +21,27 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.HServerInfo; + /** * Listener for regionserver events in master. * @see HMaster#registerRegionServerOperationListener(RegionServerOperationListener) * @see HMaster#unregisterRegionServerOperationListener(RegionServerOperationListener) */ public interface RegionServerOperationListener { + /** + * Called for each message passed the master. Most of the messages that come + * in here will go on to become {@link #process(RegionServerOperation)}s but + * others like {@linke HMsg.Type#MSG_REPORT_PROCESS_OPEN} go no further; + * only in here can you see them come in. + * @param serverInfo Server we got the message from. + * @param incomingMsg The message received. + * @return True to continue processing, false to skip. + */ + public boolean process(final HServerInfo serverInfo, + final HMsg incomingMsg); + /** * Called before processing op * @param op diff --git a/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationQueue.java b/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationQueue.java index b004f530849..0d7b5939899 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationQueue.java +++ b/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationQueue.java @@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.ipc.RemoteException; @@ -216,6 +218,24 @@ public class RegionServerOperationQueue { } } + /** + * Called for each message passed the master. Most of the messages that come + * in here will go on to become {@link #process(RegionServerOperation)}s but + * others like {@linke HMsg.Type#MSG_REPORT_PROCESS_OPEN} go no further; + * only in here can you see them come in. + * @param serverInfo Server we got the message from. + * @param incomingMsg The message received. + * @return True to continue processing, false to skip. + */ + boolean process(final HServerInfo serverInfo, + final HMsg incomingMsg) { + if (this.listeners.isEmpty()) return true; + for (RegionServerOperationListener listener: this.listeners) { + if (!listener.process(serverInfo, incomingMsg)) return false; + } + return true; + } + /* * Tell listeners that we processed a RegionServerOperation. * @param op Operation to tell the world about. diff --git a/core/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/core/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 4ba83020d01..f32ffa2a84b 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/core/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.Leases; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.master.RegionManager.RegionState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.WatchedEvent; @@ -326,7 +327,11 @@ public class ServerManager implements HConstants { } } - /* Region server is exiting + /* + * Region server is exiting with a clean shutdown. + * + * In this case, the server sends MSG_REPORT_EXITING in msgs[0] followed by + * a MSG_REPORT_CLOSE for each region it was serving. * @param serverInfo * @param msgs */ @@ -347,6 +352,7 @@ public class ServerManager implements HConstants { for (int i = 1; i < msgs.length; i++) { LOG.info("Processing " + msgs[i] + " from " + serverInfo.getServerName()); + assert msgs[i].getType() == HMsg.Type.MSG_REGION_CLOSE; HRegionInfo info = msgs[i].getRegionInfo(); // Meta/root region offlining is handed in removeServerInfo above. if (!info.isMetaRegion()) { @@ -361,6 +367,18 @@ public class ServerManager implements HConstants { } } } + + // There should not be any regions in transition for this server - the + // server should finish transitions itself before closing + Map inTransition = + master.getRegionManager().getRegionsInTransitionOnServer( + serverInfo.getServerName()); + for (Map.Entry entry : inTransition.entrySet()) { + LOG.warn("Region server " + serverInfo.getServerName() + + " shut down with region " + entry.getKey() + " in transition " + + "state " + entry.getValue()); + master.getRegionManager().setUnassigned(entry.getValue().getRegionInfo(), true); + } } // We don't need to return anything to the server because it isn't // going to do any more work. @@ -418,7 +436,7 @@ public class ServerManager implements HConstants { * @return */ private HMsg[] processMsgs(HServerInfo serverInfo, - HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[]) { + HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[]) { ArrayList returnMsgs = new ArrayList(); if (serverInfo.getServerAddress() == null) { throw new NullPointerException("Server address cannot be null; " + @@ -433,6 +451,10 @@ public class ServerManager implements HConstants { LOG.info("Processing " + incomingMsgs[i] + " from " + serverInfo.getServerName() + "; " + (i + 1) + " of " + incomingMsgs.length); + if (!this.master.getRegionServerOperationQueue(). + process(serverInfo, incomingMsgs[i])) { + continue; + } switch (incomingMsgs[i].getType()) { case MSG_REPORT_PROCESS_OPEN: openingCount++; diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2b0e6d70845..865507fda89 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -132,7 +132,7 @@ public class HRegionServer implements HConstants, HRegionInterface, // Go down hard. Used if file system becomes unavailable and also in // debugging and unit tests. protected volatile boolean abortRequested; - + // If false, the file system has become unavailable protected volatile boolean fsOk; @@ -666,7 +666,7 @@ public class HRegionServer implements HConstants, HRegionInterface, } join(); - zooKeeperWrapper.close(); + this.zooKeeperWrapper.close(); LOG.info(Thread.currentThread().getName() + " exiting"); } @@ -1423,6 +1423,14 @@ public class HRegionServer implements HConstants, HRegionInterface, region.flushcache(); break; + case TESTING_MSG_BLOCK_RS: + while (!stopRequested.get()) { + Threads.sleep(1000); + LOG.info("Regionserver blocked by " + + HMsg.Type.TESTING_MSG_BLOCK_RS + "; " + stopRequested.get()); + } + break; + default: throw new AssertionError( "Impossible state during msg processing. Instruction: " @@ -1461,7 +1469,7 @@ public class HRegionServer implements HConstants, HRegionInterface, } } } - + void openRegion(final HRegionInfo regionInfo) { Integer mapKey = Bytes.mapKey(regionInfo.getRegionName()); HRegion region = this.onlineRegions.get(mapKey); @@ -2383,7 +2391,7 @@ public class HRegionServer implements HConstants, HRegionInterface, */ public static Thread startRegionServer(final HRegionServer hrs) { return startRegionServer(hrs, - "regionserver" + hrs.server.getListenerAddress()); + "regionserver" + hrs.getServerInfo().getServerAddress().getPort()); } /** @@ -2411,6 +2419,24 @@ public class HRegionServer implements HConstants, HRegionInterface, System.exit(0); } + /** + * Utility for constructing an instance of the passed HRegionServer class. + * @param regionServerClass + * @param conf2 + * @return HRegionServer instance. + */ + public static HRegionServer constructRegionServer(Class regionServerClass, + final Configuration conf2) { + try { + Constructor c = + regionServerClass.getConstructor(HBaseConfiguration.class); + return c.newInstance(conf2); + } catch (Exception e) { + throw new RuntimeException("Failed construction of " + + "Master: " + regionServerClass.toString(), e); + } + } + /** * Do class main. * @param args @@ -2438,9 +2464,8 @@ public class HRegionServer implements HConstants, HRegionInterface, if (runtime != null) { LOG.info("vmInputArguments=" + runtime.getInputArguments()); } - Constructor c = - regionServerClass.getConstructor(Configuration.class); - startRegionServer(c.newInstance(conf)); + HRegionServer hrs = constructRegionServer(regionServerClass, conf); + startRegionServer(hrs); } } catch (Throwable t) { LOG.error( "Can not start region server because "+ diff --git a/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 8cef1eef829..b91c4e71da8 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -109,13 +109,24 @@ public class MiniHBaseCluster implements HConstants { } } + /** + * Subclass so can get at protected methods (none at moment). + */ + public static class MiniHBaseClusterRegionServer extends HRegionServer { + public MiniHBaseClusterRegionServer(HBaseConfiguration conf) + throws IOException { + super(conf); + } + } + private void init(final int nRegionNodes) throws IOException { try { // start up a LocalHBaseCluster while (true) { try { hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes, - MiniHBaseCluster.MiniHBaseClusterMaster.class); + MiniHBaseCluster.MiniHBaseClusterMaster.class, + MiniHBaseCluster.MiniHBaseClusterRegionServer.class); hbaseCluster.startup(); } catch (BindException e) { //this port is already in use. try to use another (for multiple testing) @@ -137,13 +148,13 @@ public class MiniHBaseCluster implements HConstants { * Starts a region server thread running * * @throws IOException - * @return Name of regionserver started. + * @return New RegionServerThread */ - public String startRegionServer() throws IOException { + public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException { JVMClusterUtil.RegionServerThread t = this.hbaseCluster.addRegionServer(); t.start(); t.waitForServerOnline(); - return t.getName(); + return t; } /** @@ -296,7 +307,21 @@ public class MiniHBaseCluster implements HConstants { public void addMessageToSendRegionServer(final int serverNumber, final HMsg msg) throws IOException { - HRegionServer hrs = getRegionServer(serverNumber); + MiniHBaseClusterRegionServer hrs = + (MiniHBaseClusterRegionServer)getRegionServer(serverNumber); + addMessageToSendRegionServer(hrs, msg); + } + + /** + * Add a message to include in the responses send a regionserver when it + * checks back in. + * @param hrs Which region server. + * @param msg The MESSAGE + * @throws IOException + */ + public void addMessageToSendRegionServer(final MiniHBaseClusterRegionServer hrs, + final HMsg msg) + throws IOException { ((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg); } } diff --git a/core/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransistions.java b/core/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransistions.java index 916e30d49d4..99241dc222a 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransistions.java +++ b/core/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransistions.java @@ -19,35 +19,45 @@ */ package org.apache.hadoop.hbase.master; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Collection; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HMsg; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; /** - * Test transitions of state across the master. + * Test transitions of state across the master. Sets up the cluster once and + * then runs a couple of tests. */ public class TestMasterTransistions { + private static final Log LOG = LogFactory.getLog(TestMasterTransistions.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final String TABLENAME = "master_transitions"; private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"), @@ -63,14 +73,159 @@ public class TestMasterTransistions { // Create a table of three families. This will assign a region. TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES); HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); - int countOfRegions = TEST_UTIL.createMultiRegions(t, FAMILIES[0]); + int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily()); waitUntilAllRegionsAssigned(countOfRegions); + addToEachStartKey(countOfRegions); } @AfterClass public static void afterAllTests() throws IOException { TEST_UTIL.shutdownMiniCluster(); } + /** + * HBase2482 is about outstanding region openings. If any are outstanding + * when a regionserver goes down, then they'll never deploy. They'll be + * stuck in the regions-in-transition list for ever. This listener looks + * for a region opening HMsg and if its from the server passed on construction, + * then we kill it. It also looks out for a close message on the victim + * server because that signifies start of the fireworks. + */ + static class HBase2482Listener implements RegionServerOperationListener { + private final HRegionServer victim; + private boolean abortSent = false; + // We closed regions on new server. + private volatile boolean closed = false; + // Copy of regions on new server + private final Collection copyOfOnlineRegions; + // This is the region that was in transition on the server we aborted. Test + // passes if this region comes back online successfully. + private HRegionInfo regionToFind; + + HBase2482Listener(final HRegionServer victim) { + this.victim = victim; + // Copy regions currently open on this server so I can notice when + // there is a close. + this.copyOfOnlineRegions = + this.victim.getCopyOfOnlineRegionsSortedBySize().values(); + } + + @Override + public boolean process(HServerInfo serverInfo, HMsg incomingMsg) { + if (!victim.getServerInfo().equals(serverInfo) || + this.abortSent || !this.closed) { + return true; + } + if (!incomingMsg.isType(HMsg.Type.MSG_REPORT_PROCESS_OPEN)) return true; + // Save the region that is in transition so can test later it came back. + this.regionToFind = incomingMsg.getRegionInfo(); + LOG.info("ABORTING " + this.victim + " because got a " + + HMsg.Type.MSG_REPORT_PROCESS_OPEN + " on this server for " + + incomingMsg.getRegionInfo().getRegionNameAsString()); + this.victim.abort(); + this.abortSent = true; + return true; + } + + @Override + public boolean process(RegionServerOperation op) throws IOException { + return true; + } + + @Override + public void processed(RegionServerOperation op) { + if (this.closed || !(op instanceof ProcessRegionClose)) return; + ProcessRegionClose close = (ProcessRegionClose)op; + for (HRegion r: this.copyOfOnlineRegions) { + if (r.getRegionInfo().equals(close.regionInfo)) { + // We've closed one of the regions that was on the victim server. + // Now can start testing for when all regions are back online again + LOG.info("Found close of " + + r.getRegionInfo().getRegionNameAsString() + + "; setting close happened flag"); + this.closed = true; + break; + } + } + } + } + + /** + * In 2482, a RS with an opening region on it dies. The said region is then + * stuck in the master's regions-in-transition and never leaves it. This + * test works by bringing up a new regionserver, waiting for the load + * balancer to give it some regions. Then, we close all on the new server. + * After sending all the close messages, we send the new regionserver the + * special blocking message so it can not process any more messages. + * Meantime reopening of the just-closed regions is backed up on the new + * server. Soon as master gets an opening region from the new regionserver, + * we kill it. We then wait on all regions to combe back on line. If bug + * is fixed, this should happen soon as the processing of the killed server is + * done. + * @see HBASE-2482 + */ + @Test public void testKillRSWithOpeningRegion2482() throws Exception { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + // Count how many regions are online. They need to be all back online for + // this test to succeed. + int countOfMetaRegions = countOfMetaRegions(); + // Add a listener on the server. + HMaster m = cluster.getMaster(); + // Start new regionserver. + MiniHBaseClusterRegionServer hrs = + (MiniHBaseClusterRegionServer)cluster.startRegionServer().getRegionServer(); + LOG.info("Started new regionserver: " + hrs.toString()); + // Wait until has some regions before proceeding. Balancer will give it some. + int minimumRegions = + countOfMetaRegions/(cluster.getRegionServerThreads().size() * 2); + while (hrs.getOnlineRegions().size() < minimumRegions) Threads.sleep(100); + // Set the listener only after some regions have been opened on new server. + HBase2482Listener listener = new HBase2482Listener(hrs); + m.getRegionServerOperationQueue(). + registerRegionServerOperationListener(listener); + try { + // Go close all non-catalog regions on this new server + closeAlltNonCatalogRegions(cluster, hrs); + // After all closes, add blocking message before the region opens start to + // come in. + cluster.addMessageToSendRegionServer(hrs, + new HMsg(HMsg.Type.TESTING_MSG_BLOCK_RS)); + // Wait till one of the above close messages has an effect before we start + // wait on all regions back online. + while (!listener.closed) Threads.sleep(100); + LOG.info("Past close"); + // Make sure the abort server message was sent. + while(!listener.abortSent) Threads.sleep(100); + LOG.info("Past abort send; waiting on all regions to redeploy"); + // Now wait for regions to come back online. + assertRegionIsBackOnline(listener.regionToFind); + } finally { + m.getRegionServerOperationQueue(). + unregisterRegionServerOperationListener(listener); + } + } + + + /* + * @param cluster + * @param hrs + * @return Count of regions closed. + * @throws IOException + */ + private int closeAlltNonCatalogRegions(final MiniHBaseCluster cluster, + final MiniHBaseCluster.MiniHBaseClusterRegionServer hrs) + throws IOException { + int countOfRegions = 0; + for (HRegion r: hrs.getOnlineRegions()) { + if (r.getRegionInfo().isMetaRegion()) continue; + cluster.addMessageToSendRegionServer(hrs, + new HMsg(HMsg.Type.MSG_REGION_CLOSE, r.getRegionInfo())); + LOG.info("Sent close of " + r.getRegionInfo().getRegionNameAsString() + + " on " + hrs.toString()); + countOfRegions++; + } + return countOfRegions; + } + /** * Listener for regionserver events testing hbase-2428 (Infinite loop of * region closes if META region is offline). In particular, listen @@ -167,6 +322,11 @@ public class TestMasterTransistions { int getCloseCount() { return this.closeCount; } + + @Override + public boolean process(HServerInfo serverInfo, HMsg incomingMsg) { + return true; + } } /** @@ -211,24 +371,19 @@ public class TestMasterTransistions { assertTrue(listener.getCloseCount() < ((HBase2428Listener.SERVER_DURATION/HBase2428Listener.CLOSE_DURATION) * 2)); - assertClosedRegionIsBackOnline(hri); + // Assert the closed region came back online + assertRegionIsBackOnline(hri); } finally { master.getRegionServerOperationQueue(). unregisterRegionServerOperationListener(listener); } } - private void assertClosedRegionIsBackOnline(final HRegionInfo hri) + private void assertRegionIsBackOnline(final HRegionInfo hri) throws IOException { - // When we get here, region should be successfully deployed. Assert so. - // 'aaa' is safe as first row if startkey is EMPTY_BYTE_ARRAY because we - // loaded with HBaseTestingUtility#createMultiRegions. - byte [] row = Bytes.equals(HConstants.EMPTY_BYTE_ARRAY, hri.getStartKey())? - new byte [] {'a', 'a', 'a'}: hri.getStartKey(); - Put p = new Put(row); - p.add(FAMILIES[0], FAMILIES[0], FAMILIES[0]); + // Region should have an entry in its startkey because of addRowToEachRegion. + byte [] row = getStartKey(hri); HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); - t.put(p); Get g = new Get(row); assertTrue((t.get(g)).size() > 0); } @@ -256,8 +411,81 @@ public class TestMasterTransistions { rows++; } s.close(); - // If I got to hear and all rows have a Server, then all have been assigned. + // If I get to here and all rows have a Server, then all have been assigned. if (rows == countOfRegions) break; + LOG.info("Found=" + rows); + Threads.sleep(1000); } } + + /* + * @return Count of regions in meta table. + * @throws IOException + */ + private static int countOfMetaRegions() + throws IOException { + HTable meta = new HTable(TEST_UTIL.getConfiguration(), + HConstants.META_TABLE_NAME); + int rows = 0; + Scan scan = new Scan(); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + ResultScanner s = meta.getScanner(scan); + for (Result r = null; (r = s.next()) != null;) { + byte [] b = + r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + if (b == null || b.length <= 0) break; + rows++; + } + s.close(); + return rows; + } + + /* + * Add to each of the regions in .META. a value. Key is the startrow of the + * region (except its 'aaa' for first region). Actual value is the row name. + * @param expected + * @return + * @throws IOException + */ + private static int addToEachStartKey(final int expected) throws IOException { + HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + HTable meta = new HTable(TEST_UTIL.getConfiguration(), + HConstants.META_TABLE_NAME); + int rows = 0; + Scan scan = new Scan(); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + ResultScanner s = meta.getScanner(scan); + for (Result r = null; (r = s.next()) != null;) { + byte [] b = + r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + if (b == null || b.length <= 0) break; + HRegionInfo hri = Writables.getHRegionInfo(b); + // If start key, add 'aaa'. + byte [] row = getStartKey(hri); + Put p = new Put(row); + p.add(getTestFamily(), getTestQualifier(), row); + t.put(p); + rows++; + } + s.close(); + Assert.assertEquals(expected, rows); + return rows; + } + + /* + * @param hri + * @return Start key for hri (If start key is '', then return 'aaa'. + */ + private static byte [] getStartKey(final HRegionInfo hri) { + return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())? + Bytes.toBytes("aaa"): hri.getStartKey(); + } + + private static byte [] getTestFamily() { + return FAMILIES[0]; + } + + private static byte [] getTestQualifier() { + return getTestFamily(); + } }