HBASE-5179 Handle potential data loss due to concurrent processing of processFaileOver and ServerShutdownHandler
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1300194 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a728f94079
commit
05423b150b
|
@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
|||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.RegionTransitionData;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
|
||||
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
|
||||
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
|
||||
|
@ -328,11 +327,13 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
/**
|
||||
* Called on startup.
|
||||
* Figures whether a fresh cluster start of we are joining extant running cluster.
|
||||
* @param onlineServers onlined servers when master started
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void joinCluster() throws IOException, KeeperException, InterruptedException {
|
||||
void joinCluster(final Set<ServerName> onlineServers) throws IOException,
|
||||
KeeperException, InterruptedException {
|
||||
// Concurrency note: In the below the accesses on regionsInTransition are
|
||||
// outside of a synchronization block where usually all accesses to RIT are
|
||||
// synchronized. The presumption is that in this case it is safe since this
|
||||
|
@ -343,7 +344,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
// Scan META to build list of existing regions, servers, and assignment
|
||||
// Returns servers who have not checked in (assumed dead) and their regions
|
||||
Map<ServerName,List<Pair<HRegionInfo,Result>>> deadServers = rebuildUserRegions();
|
||||
Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers = rebuildUserRegions(onlineServers);
|
||||
|
||||
processDeadServersAndRegionsInTransition(deadServers);
|
||||
|
||||
|
@ -353,6 +354,16 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
recoverTableInEnablingState(this.enablingTables, isWatcherCreated);
|
||||
}
|
||||
|
||||
/**
|
||||
* Only used for tests
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void joinCluster() throws IOException, KeeperException, InterruptedException {
|
||||
joinCluster(serverManager.getOnlineServers().keySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all regions that are in transition up in zookeeper. Used by
|
||||
* master joining an already running cluster.
|
||||
|
@ -398,6 +409,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
|
||||
// Remove regions in RIT, they are possibly being processed by
|
||||
// ServerShutdownHandler.
|
||||
synchronized (regionsInTransition) {
|
||||
nodes.removeAll(regionsInTransition.keySet());
|
||||
}
|
||||
|
||||
// If we found user regions out on cluster, its a failover.
|
||||
if (this.failover) {
|
||||
LOG.info("Found regions out on cluster or in RIT; failover");
|
||||
|
@ -1770,6 +1787,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
final List<ServerName> servers = this.serverManager.getOnlineServersList();
|
||||
final List<ServerName> drainingServers = this.serverManager.getDrainingServersList();
|
||||
|
||||
|
||||
if (serverToExclude != null) servers.remove(serverToExclude);
|
||||
|
||||
// Loop through the draining server list and remove them from the server
|
||||
|
@ -1782,6 +1800,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
|
||||
// Remove the deadNotExpired servers from the server list.
|
||||
removeDeadNotExpiredServers(servers);
|
||||
|
||||
|
||||
|
||||
if (servers.isEmpty()) return null;
|
||||
|
||||
RegionPlan randomPlan = new RegionPlan(state.getRegion(), null,
|
||||
|
@ -1813,7 +1836,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
" so generated a random one; " + randomPlan + "; " +
|
||||
serverManager.countOfRegionServers() +
|
||||
" (online=" + serverManager.getOnlineServers().size() +
|
||||
", exclude=" + drainingServers.size() + ") available servers");
|
||||
", available=" + servers.size() + ") available servers");
|
||||
return randomPlan;
|
||||
}
|
||||
LOG.debug("Using pre-existing plan for region " +
|
||||
|
@ -1821,6 +1844,23 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
return existingPlan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Loop through the deadNotExpired server list and remove them from the
|
||||
* servers.
|
||||
* @param servers
|
||||
*/
|
||||
public void removeDeadNotExpiredServers(List<ServerName> servers) {
|
||||
Set<ServerName> deadNotExpiredServers = this.serverManager
|
||||
.getDeadNotExpiredServers();
|
||||
if (!deadNotExpiredServers.isEmpty()) {
|
||||
for (ServerName server : deadNotExpiredServers) {
|
||||
LOG.debug("Removing dead but not expired server: " + server
|
||||
+ " from eligible server pool.");
|
||||
servers.remove(server);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unassign the list of regions. Configuration knobs:
|
||||
* hbase.bulk.waitbetween.reopen indicates the number of milliseconds to
|
||||
|
@ -2134,6 +2174,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
throws IOException,
|
||||
InterruptedException {
|
||||
List<ServerName> servers = this.serverManager.getOnlineServersList();
|
||||
removeDeadNotExpiredServers(servers);
|
||||
assignUserRegions(regions, servers);
|
||||
}
|
||||
|
||||
|
@ -2173,6 +2214,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// Get all available servers
|
||||
List<ServerName> servers = serverManager.getOnlineServersList();
|
||||
|
||||
// Remove the deadNotExpired servers from the server list.
|
||||
removeDeadNotExpiredServers(servers);
|
||||
|
||||
// If there are no servers we need not proceed with region assignment.
|
||||
if(servers.isEmpty()) return;
|
||||
|
||||
|
@ -2377,11 +2421,14 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* <p>
|
||||
* Returns a map of servers that are not found to be online and the regions
|
||||
* they were hosting.
|
||||
* @param onlineServers if one region's location belongs to onlineServers, it
|
||||
* doesn't need to be assigned.
|
||||
* @return map of servers not online to their assigned regions, as stored
|
||||
* in META
|
||||
* @throws IOException
|
||||
*/
|
||||
Map<ServerName, List<Pair<HRegionInfo, Result>>> rebuildUserRegions()
|
||||
Map<ServerName, List<Pair<HRegionInfo, Result>>> rebuildUserRegions(
|
||||
final Set<ServerName> onlineServers)
|
||||
throws IOException, KeeperException {
|
||||
// Region assignment from META
|
||||
List<Result> results = MetaReader.fullScan(this.catalogTracker);
|
||||
|
@ -2414,7 +2461,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
addTheTablesInPartialState(this.disablingTables, this.enablingTables, regionInfo,
|
||||
tableName);
|
||||
} else if (!this.serverManager.isServerOnline(regionLocation)) {
|
||||
} else if (!onlineServers.contains(regionLocation)) {
|
||||
// Region is located on a server that isn't online
|
||||
List<Pair<HRegionInfo, Result>> offlineRegions =
|
||||
offlineServers.get(regionLocation);
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.net.InetAddress;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -191,6 +192,8 @@ Server {
|
|||
private volatile boolean isActiveMaster = false;
|
||||
// flag set after we complete initialization once active (used for testing)
|
||||
private volatile boolean initialized = false;
|
||||
// flag set after we complete assignRootAndMeta.
|
||||
private volatile boolean serverShutdownHandlerEnabled = false;
|
||||
|
||||
// Instance of the hbase executor service.
|
||||
ExecutorService executorService;
|
||||
|
@ -527,13 +530,17 @@ Server {
|
|||
}
|
||||
}
|
||||
|
||||
Set<ServerName> onlineServers = new HashSet<ServerName>(serverManager
|
||||
.getOnlineServers().keySet());
|
||||
// TODO: Should do this in background rather than block master startup
|
||||
status.setStatus("Splitting logs after master startup");
|
||||
this.fileSystemManager.
|
||||
splitLogAfterStartup(this.serverManager.getOnlineServers().keySet());
|
||||
splitLogAfterStartup(this.fileSystemManager, onlineServers);
|
||||
|
||||
// Make sure root and meta assigned before proceeding.
|
||||
assignRootAndMeta(status);
|
||||
serverShutdownHandlerEnabled = true;
|
||||
this.serverManager.expireDeadNotExpiredServers();
|
||||
|
||||
// Update meta with new HRI if required. i.e migrate all HRI with HTD to
|
||||
// HRI with out HTD in meta and update the status in ROOT. This must happen
|
||||
// before we assign all user regions or else the assignment will fail.
|
||||
|
@ -543,7 +550,7 @@ Server {
|
|||
|
||||
// Fixup assignment manager status
|
||||
status.setStatus("Starting assignment manager");
|
||||
this.assignmentManager.joinCluster();
|
||||
this.assignmentManager.joinCluster(onlineServers);
|
||||
|
||||
this.balancer.setClusterStatus(getClusterStatus());
|
||||
this.balancer.setMasterServices(this);
|
||||
|
@ -578,6 +585,16 @@ Server {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override to change master's splitLogAfterStartup. Used testing
|
||||
* @param mfs
|
||||
* @param onlineServers
|
||||
*/
|
||||
protected void splitLogAfterStartup(final MasterFileSystem mfs,
|
||||
Set<ServerName> onlineServers) {
|
||||
mfs.splitLogAfterStartup(onlineServers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check <code>-ROOT-</code> and <code>.META.</code> are assigned. If not,
|
||||
* assign them.
|
||||
|
@ -595,17 +612,11 @@ Server {
|
|||
status.setStatus("Assigning ROOT region");
|
||||
boolean rit = this.assignmentManager.
|
||||
processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO);
|
||||
ServerName expiredServer = null;
|
||||
ServerName currentRootServer = null;
|
||||
if (!catalogTracker.verifyRootRegionLocation(timeout)) {
|
||||
ServerName currentRootServer = this.catalogTracker.getRootLocation();
|
||||
if (expireIfOnline(currentRootServer)) {
|
||||
// We are expiring this server. The processing of expiration will assign
|
||||
// root so don't do it here.
|
||||
expiredServer = currentRootServer;
|
||||
} else {
|
||||
// Root was not on an online server when we failed verification
|
||||
this.assignmentManager.assignRoot();
|
||||
}
|
||||
currentRootServer = this.catalogTracker.getRootLocation();
|
||||
splitLogAndExpireIfOnline(currentRootServer);
|
||||
this.assignmentManager.assignRoot();
|
||||
this.catalogTracker.waitForRoot();
|
||||
//This guarantees that the transition has completed
|
||||
this.assignmentManager.waitForAssignment(HRegionInfo.ROOT_REGIONINFO);
|
||||
|
@ -625,13 +636,11 @@ Server {
|
|||
if (!this.catalogTracker.verifyMetaRegionLocation(timeout)) {
|
||||
ServerName currentMetaServer =
|
||||
this.catalogTracker.getMetaLocationOrReadLocationFromRoot();
|
||||
if (currentMetaServer != null && currentMetaServer.equals(expiredServer)) {
|
||||
// We are expiring the server that is carrying meta already.
|
||||
// The expiration processing will take care of reassigning meta.
|
||||
expireIfOnline(currentMetaServer);
|
||||
} else {
|
||||
this.assignmentManager.assignMeta();
|
||||
if (currentMetaServer != null
|
||||
&& !currentMetaServer.equals(currentRootServer)) {
|
||||
splitLogAndExpireIfOnline(currentMetaServer);
|
||||
}
|
||||
assignmentManager.assignMeta();
|
||||
this.catalogTracker.waitForMeta();
|
||||
// Above check waits for general meta availability but this does not
|
||||
// guarantee that the transition has completed
|
||||
|
@ -682,16 +691,19 @@ Server {
|
|||
}
|
||||
|
||||
/**
|
||||
* Expire a server if we find it is one of the online servers set.
|
||||
* Split a server's log and expire it if we find it is one of the online
|
||||
* servers.
|
||||
* @param sn ServerName to check.
|
||||
* @return True if server was online and so we expired it as unreachable.
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean expireIfOnline(final ServerName sn) {
|
||||
if (sn == null) return false;
|
||||
if (!this.serverManager.isServerOnline(sn)) return false;
|
||||
LOG.info("Forcing expiration of " + sn);
|
||||
this.serverManager.expireServer(sn);
|
||||
return true;
|
||||
private void splitLogAndExpireIfOnline(final ServerName sn)
|
||||
throws IOException {
|
||||
if (sn == null || !serverManager.isServerOnline(sn)) {
|
||||
return;
|
||||
}
|
||||
LOG.info("Forcing splitLog and expire of " + sn);
|
||||
fileSystemManager.splitLog(sn);
|
||||
serverManager.expireServer(sn);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1692,7 +1704,16 @@ Server {
|
|||
public boolean isInitialized() {
|
||||
return initialized;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* ServerShutdownHandlerEnabled is set false before completing
|
||||
* assignRootAndMeta to prevent processing of ServerShutdownHandler.
|
||||
* @return true if assignRootAndMeta has completed;
|
||||
*/
|
||||
public boolean isServerShutdownHandlerEnabled() {
|
||||
return this.serverShutdownHandlerEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public void assign(final byte[] regionName, final boolean force)
|
||||
|
|
|
@ -25,8 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
|
||||
|
@ -94,4 +92,9 @@ public interface MasterServices extends Server {
|
|||
*/
|
||||
public RegionServerTracker getRegionServerTracker();
|
||||
|
||||
/**
|
||||
* @return true if master enables ServerShutdownHandler;
|
||||
*/
|
||||
public boolean isServerShutdownHandlerEnabled();
|
||||
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.net.InetAddress;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -46,10 +48,10 @@ import org.apache.hadoop.hbase.client.HConnection;
|
|||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
|
||||
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
|
||||
/**
|
||||
* The ServerManager class manages info about region servers.
|
||||
|
@ -97,6 +99,14 @@ public class ServerManager {
|
|||
|
||||
private final long maxSkew;
|
||||
|
||||
/**
|
||||
* Set of region servers which are dead but not expired immediately. If one
|
||||
* server died before master enables ServerShutdownHandler, the server will be
|
||||
* added to set and will be expired through calling
|
||||
* {@link ServerManager#expireDeadNotExpiredServers()} by master.
|
||||
*/
|
||||
private Set<ServerName> deadNotExpiredServers = new HashSet<ServerName>();
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param master
|
||||
|
@ -349,6 +359,12 @@ public class ServerManager {
|
|||
* shutdown processing.
|
||||
*/
|
||||
public synchronized void expireServer(final ServerName serverName) {
|
||||
if (!services.isServerShutdownHandlerEnabled()) {
|
||||
LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
|
||||
+ "delay expiring server " + serverName);
|
||||
this.deadNotExpiredServers.add(serverName);
|
||||
return;
|
||||
}
|
||||
excludeRegionServerFromSchemaChanges(serverName);
|
||||
if (!this.onlineServers.containsKey(serverName)) {
|
||||
LOG.warn("Received expiration of " + serverName +
|
||||
|
@ -395,6 +411,22 @@ public class ServerManager {
|
|||
carryingRoot + ", meta=" + carryingMeta);
|
||||
}
|
||||
|
||||
/**
|
||||
* Expire the servers which died during master's initialization. It will be
|
||||
* called after HMaster#assignRootAndMeta.
|
||||
* @throws IOException
|
||||
* */
|
||||
synchronized void expireDeadNotExpiredServers() throws IOException {
|
||||
if (!services.isServerShutdownHandlerEnabled()) {
|
||||
throw new IOException("Master hasn't enabled ServerShutdownHandler ");
|
||||
}
|
||||
Iterator<ServerName> serverIterator = deadNotExpiredServers.iterator();
|
||||
while (serverIterator.hasNext()) {
|
||||
expireServer(serverIterator.next());
|
||||
serverIterator.remove();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Remove the server from the drain list.
|
||||
*/
|
||||
|
@ -606,6 +638,13 @@ public class ServerManager {
|
|||
return new ArrayList<ServerName>(this.drainingServers);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A copy of the internal set of deadNotExpired servers.
|
||||
*/
|
||||
Set<ServerName> getDeadNotExpiredServers() {
|
||||
return new HashSet<ServerName>(this.deadNotExpiredServers);
|
||||
}
|
||||
|
||||
public boolean isServerOnline(ServerName serverName) {
|
||||
return onlineServers.containsKey(serverName);
|
||||
}
|
||||
|
|
|
@ -172,6 +172,8 @@ public class CreateTableHandler extends EventHandler {
|
|||
|
||||
// 4. Trigger immediate assignment of the regions in round-robin fashion
|
||||
List<ServerName> servers = serverManager.getOnlineServersList();
|
||||
// Remove the deadNotExpired servers from the server list.
|
||||
assignmentManager.removeDeadNotExpiredServers(servers);
|
||||
try {
|
||||
this.assignmentManager.assignUserRegions(Arrays.asList(newRegions),
|
||||
servers);
|
||||
|
|
|
@ -780,7 +780,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
// Interrupt catalog tracker here in case any regions being opened out in
|
||||
// handlers are stuck waiting on meta or root.
|
||||
if (this.catalogTracker != null) this.catalogTracker.stop();
|
||||
if (this.fsOk) {
|
||||
if (!this.killed && this.fsOk) {
|
||||
waitOnAllRegionsToClose(abortRequested);
|
||||
LOG.info("stopping server " + this.serverNameFromMasterPOV +
|
||||
"; all regions closed.");
|
||||
|
|
|
@ -22,17 +22,22 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerLoad;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -45,6 +50,7 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
|||
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
|
@ -59,10 +65,6 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -115,10 +117,12 @@ public class TestAssignmentManager {
|
|||
this.serverManager = Mockito.mock(ServerManager.class);
|
||||
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
|
||||
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B)).thenReturn(true);
|
||||
final List<ServerName> onlineServers = new ArrayList<ServerName>();
|
||||
onlineServers.add(SERVERNAME_B);
|
||||
onlineServers.add(SERVERNAME_A);
|
||||
Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(onlineServers);
|
||||
final Map<ServerName, HServerLoad> onlineServers = new HashMap<ServerName, HServerLoad>();
|
||||
onlineServers.put(SERVERNAME_B, new HServerLoad());
|
||||
onlineServers.put(SERVERNAME_A, new HServerLoad());
|
||||
Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(
|
||||
new ArrayList<ServerName>(onlineServers.keySet()));
|
||||
Mockito.when(this.serverManager.getOnlineServers()).thenReturn(onlineServers);
|
||||
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, REGIONINFO, -1)).
|
||||
thenReturn(true);
|
||||
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_B, REGIONINFO, -1)).
|
||||
|
|
|
@ -35,7 +35,17 @@ import java.util.TreeMap;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
|
@ -260,6 +270,11 @@ public class TestCatalogJanitor {
|
|||
public RegionServerTracker getRegionServerTracker() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isServerShutdownHandlerEnabled() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -0,0 +1,261 @@
|
|||
/*
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.master.TestMasterFailover;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestRSKilledWhenMasterInitializing {
|
||||
private static final Log LOG = LogFactory.getLog(TestMasterFailover.class);
|
||||
|
||||
private static final HBaseTestingUtility TESTUTIL = new HBaseTestingUtility();
|
||||
private static final int NUM_MASTERS = 1;
|
||||
private static final int NUM_RS = 4;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
// Set it so that this test runs with my custom master
|
||||
TESTUTIL.getConfiguration().setClass(HConstants.MASTER_IMPL,
|
||||
TestingMaster.class, HMaster.class);
|
||||
// Start up the cluster.
|
||||
TESTUTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
if (!TESTUTIL.getHBaseCluster().getMaster().isInitialized()) {
|
||||
// master is not initialized and is waiting something forever.
|
||||
for (MasterThread mt : TESTUTIL.getHBaseCluster().getLiveMasterThreads()) {
|
||||
mt.interrupt();
|
||||
}
|
||||
}
|
||||
TESTUTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* An HMaster instance used in this test. If 'TestingMaster.sleep' is set in
|
||||
* the Configuration, then we'll sleep after log is split and we'll also
|
||||
* return a custom RegionServerTracker.
|
||||
*/
|
||||
public static class TestingMaster extends HMaster {
|
||||
private boolean logSplit = false;
|
||||
|
||||
public TestingMaster(Configuration conf) throws IOException,
|
||||
KeeperException, InterruptedException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void splitLogAfterStartup(MasterFileSystem mfs,
|
||||
Set<ServerName> onlineServers) {
|
||||
super.splitLogAfterStartup(mfs, onlineServers);
|
||||
logSplit = true;
|
||||
// If "TestingMaster.sleep" is set, sleep after log split.
|
||||
if (getConfiguration().getBoolean("TestingMaster.sleep", false)) {
|
||||
int duration = getConfiguration().getInt(
|
||||
"TestingMaster.sleep.duration", 0);
|
||||
Threads.sleep(duration);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public boolean isLogSplitAfterStartup() {
|
||||
return logSplit;
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testCorrectnessWhenMasterFailOver() throws Exception {
|
||||
final byte[] TABLENAME = Bytes.toBytes("testCorrectnessWhenMasterFailOver");
|
||||
final byte[] FAMILY = Bytes.toBytes("family");
|
||||
final byte[][] SPLITKEYS = { Bytes.toBytes("b"), Bytes.toBytes("i") };
|
||||
|
||||
MiniHBaseCluster cluster = TESTUTIL.getHBaseCluster();
|
||||
|
||||
HTableDescriptor desc = new HTableDescriptor(TABLENAME);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||
HBaseAdmin hbaseAdmin = TESTUTIL.getHBaseAdmin();
|
||||
hbaseAdmin.createTable(desc, SPLITKEYS);
|
||||
|
||||
assertTrue(hbaseAdmin.isTableAvailable(TABLENAME));
|
||||
|
||||
HTable table = new HTable(TESTUTIL.getConfiguration(), TABLENAME);
|
||||
List<Put> puts = new ArrayList<Put>();
|
||||
Put put1 = new Put(Bytes.toBytes("a"));
|
||||
put1.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value"));
|
||||
Put put2 = new Put(Bytes.toBytes("h"));
|
||||
put2.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value"));
|
||||
Put put3 = new Put(Bytes.toBytes("o"));
|
||||
put3.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value"));
|
||||
puts.add(put1);
|
||||
puts.add(put2);
|
||||
puts.add(put3);
|
||||
table.put(puts);
|
||||
ResultScanner resultScanner = table.getScanner(new Scan());
|
||||
int count = 0;
|
||||
while (resultScanner.next() != null) {
|
||||
count++;
|
||||
}
|
||||
resultScanner.close();
|
||||
table.close();
|
||||
assertEquals(3, count);
|
||||
|
||||
/* Starting test */
|
||||
cluster.getConfiguration().setBoolean("TestingMaster.sleep", true);
|
||||
cluster.getConfiguration().setInt("TestingMaster.sleep.duration", 10000);
|
||||
|
||||
/* NO.1 .META. region correctness */
|
||||
// First abort master
|
||||
abortMaster(cluster);
|
||||
TestingMaster master = startMasterAndWaitUntilLogSplit(cluster);
|
||||
|
||||
// Second kill meta server
|
||||
int metaServerNum = cluster.getServerWithMeta();
|
||||
int rootServerNum = cluster.getServerWith(HRegionInfo.ROOT_REGIONINFO
|
||||
.getRegionName());
|
||||
HRegionServer metaRS = cluster.getRegionServer(metaServerNum);
|
||||
LOG.debug("Killing metaRS and carryingRoot = "
|
||||
+ (metaServerNum == rootServerNum));
|
||||
metaRS.kill();
|
||||
metaRS.join();
|
||||
|
||||
/*
|
||||
* Sleep double time of TestingMaster.sleep.duration, so we can ensure that
|
||||
* master has already assigned ROOTandMETA or is blocking on assigning
|
||||
* ROOTandMETA
|
||||
*/
|
||||
Thread.sleep(10000 * 2);
|
||||
|
||||
waitUntilMasterIsInitialized(master);
|
||||
|
||||
// Third check whether data is correct in meta region
|
||||
assertTrue(hbaseAdmin.isTableAvailable(TABLENAME));
|
||||
|
||||
/*
|
||||
* NO.2 -ROOT- region correctness . If the .META. server killed in the NO.1
|
||||
* is also carrying -ROOT- region, it is not needed
|
||||
*/
|
||||
if (rootServerNum != metaServerNum) {
|
||||
// First abort master
|
||||
abortMaster(cluster);
|
||||
master = startMasterAndWaitUntilLogSplit(cluster);
|
||||
|
||||
// Second kill meta server
|
||||
HRegionServer rootRS = cluster.getRegionServer(rootServerNum);
|
||||
LOG.debug("Killing rootRS");
|
||||
rootRS.kill();
|
||||
rootRS.join();
|
||||
|
||||
/*
|
||||
* Sleep double time of TestingMaster.sleep.duration, so we can ensure
|
||||
* that master has already assigned ROOTandMETA or is blocking on
|
||||
* assigning ROOTandMETA
|
||||
*/
|
||||
Thread.sleep(10000 * 2);
|
||||
waitUntilMasterIsInitialized(master);
|
||||
|
||||
// Third check whether data is correct in meta region
|
||||
assertTrue(hbaseAdmin.isTableAvailable(TABLENAME));
|
||||
}
|
||||
|
||||
/* NO.3 data region correctness */
|
||||
ServerManager serverManager = cluster.getMaster().getServerManager();
|
||||
while (serverManager.areDeadServersInProgress()) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
table = new HTable(TESTUTIL.getConfiguration(), TABLENAME);
|
||||
resultScanner = table.getScanner(new Scan());
|
||||
count = 0;
|
||||
while (resultScanner.next() != null) {
|
||||
count++;
|
||||
}
|
||||
resultScanner.close();
|
||||
table.close();
|
||||
assertEquals(3, count);
|
||||
}
|
||||
|
||||
private void abortMaster(MiniHBaseCluster cluster)
|
||||
throws InterruptedException {
|
||||
for (MasterThread mt : cluster.getLiveMasterThreads()) {
|
||||
if (mt.getMaster().isActiveMaster()) {
|
||||
mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
|
||||
mt.join();
|
||||
break;
|
||||
}
|
||||
}
|
||||
LOG.debug("Master is aborted");
|
||||
}
|
||||
|
||||
private TestingMaster startMasterAndWaitUntilLogSplit(MiniHBaseCluster cluster)
|
||||
throws IOException, InterruptedException {
|
||||
TestingMaster master = (TestingMaster) cluster.startMaster().getMaster();
|
||||
while (!master.isLogSplitAfterStartup()) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
LOG.debug("splitted:" + master.isLogSplitAfterStartup() + ",initialized:"
|
||||
+ master.isInitialized());
|
||||
return master;
|
||||
}
|
||||
|
||||
private void waitUntilMasterIsInitialized(HMaster master)
|
||||
throws InterruptedException {
|
||||
while (!master.isInitialized()) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
LOG.debug("master isInitialized");
|
||||
}
|
||||
|
||||
@org.junit.Rule
|
||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||
|
||||
}
|
Loading…
Reference in New Issue