HBASE-13616 Move ServerShutdownHandler to Pv2

This commit is contained in:
stack 2015-05-28 17:13:54 -07:00
parent e61bf1bf25
commit 325614220f
45 changed files with 3258 additions and 1052 deletions

View File

@ -846,7 +846,7 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
}
/**
* Convert a HRegionInfo to a RegionInfo
* Convert a HRegionInfo to the protobuf RegionInfo
*
* @return the converted RegionInfo
*/

View File

@ -168,7 +168,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
// check whether there is multi support. If yes, use it.
if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
LOG.info("Atomically moving " + regionserverZnode + "'s wals to my queue");
LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue");
newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
} else {
LOG.info("Moving " + regionserverZnode + "'s wals to my queue");
@ -336,9 +336,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
}
// add delete op for dead rs
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
LOG.debug(" The multi list size is: " + listOfOps.size());
if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
LOG.info("Atomically moved the dead regionserver logs. ");
if (LOG.isTraceEnabled()) LOG.trace("Atomically moved the dead regionserver logs. ");
} catch (KeeperException e) {
// Multi call failed; it looks like some other regionserver took away the logs.
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);

View File

@ -97,7 +97,7 @@ public class MetaTableLocator {
}
/**
*
*
* @param zkw
* @param replicaId
* @return meta table regions and their locations.
@ -120,7 +120,7 @@ public class MetaTableLocator {
}
/**
*
*
* @param zkw
* @param replicaId
* @return List of meta regions

View File

@ -2665,7 +2665,7 @@ public class Bytes implements Comparable<Bytes> {
/**
* Find index of passed delimiter walking from end of buffer backwards.
*
*
* @param b
* @param delimiter
* @return Index of delimiter
@ -2684,15 +2684,15 @@ public class Bytes implements Comparable<Bytes> {
}
return result;
}
public static int findCommonPrefix(byte[] left, byte[] right, int leftLength, int rightLength,
int leftOffset, int rightOffset) {
int length = Math.min(leftLength, rightLength);
int result = 0;
while (result < length && left[leftOffset + result] == right[rightOffset + result]) {
result++;
}
return result;
}
}
}

View File

@ -168,6 +168,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
// no-op
}
/**
* By default, the executor will run procedures start to finish. Return true to make the executor
* yield between each flow step to give other procedures time to run their flow steps.
* @return Return true if the executor should yield on completion of a flow state step.
* Defaults to return false.
*/
protected boolean isYieldAfterSuccessfulFlowStateStep() {
return false;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -691,4 +701,4 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return proc;
}
}
}

View File

@ -148,8 +148,8 @@ public class ProcedureExecutor<TEnvironment> {
public void periodicExecute(final TEnvironment env) {
if (completed.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No completed procedures to cleanup.");
if (LOG.isTraceEnabled()) {
LOG.trace("No completed procedures to cleanup.");
}
return;
}
@ -1134,4 +1134,4 @@ public class ProcedureExecutor<TEnvironment> {
}
return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getResult());
}
}
}

View File

@ -183,3 +183,25 @@ message DisableTableStateData {
required TableName table_name = 2;
required bool skip_table_state_check = 3;
}
message ServerCrashStateData {
required ServerName server_name = 1;
optional bool distributed_log_replay = 2;
repeated RegionInfo regions_on_crashed_server = 3;
repeated RegionInfo regions_to_assign = 4;
optional bool carrying_meta = 5;
optional bool should_split_wal = 6 [default = true];
}
enum ServerCrashState {
SERVER_CRASH_START = 1;
SERVER_CRASH_PROCESS_META = 2;
SERVER_CRASH_GET_REGIONS = 3;
SERVER_CRASH_NO_SPLIT_LOGS = 4;
SERVER_CRASH_SPLIT_LOGS = 5;
SERVER_CRASH_PREPARE_LOG_REPLAY = 6;
SERVER_CRASH_CALC_REGIONS_TO_ASSIGN = 7;
SERVER_CRASH_ASSIGN = 8;
SERVER_CRASH_WAIT_ON_ASSIGN = 9;
SERVER_CRASH_FINISH = 100;
}

View File

@ -69,7 +69,7 @@ import org.apache.zookeeper.data.Stat;
/**
* ZooKeeper based implementation of
* {@link org.apache.hadoop.hbase.master.SplitLogManagerCoordination}
* {@link SplitLogManagerCoordination}
*/
@InterfaceAudience.Private
public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
@ -647,7 +647,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
ZKUtil.createSetData(this.watcher, nodePath,
ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
if (LOG.isDebugEnabled()) {
LOG.debug("Marked " + regionEncodeName + " as recovering from " + serverName +
LOG.debug("Marked " + regionEncodeName + " recovering from " + serverName +
": " + nodePath);
}
// break retry loop
@ -684,7 +684,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
/**
* ZooKeeper implementation of
* {@link org.apache.hadoop.hbase.master.
* {@link org.apache.hadoop.hbase.coordination.
* SplitLogManagerCoordination#removeStaleRecoveringRegions(Set)}
*/
@Override
@ -789,7 +789,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
public void setRecoveryMode(boolean isForInitialization) throws IOException {
synchronized(this) {
if (this.isDrainingDone) {
// when there is no outstanding splitlogtask after master start up, we already have up to
// when there is no outstanding splitlogtask after master start up, we already have up to
// date recovery mode
return;
}
@ -920,9 +920,9 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
/**
* {@link org.apache.hadoop.hbase.master.SplitLogManager} can use objects implementing this
* interface to finish off a partially done task by
* {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. This provides a
* {@link org.apache.hadoop.hbase.master.SplitLogManager} can use objects implementing this
* interface to finish off a partially done task by
* {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. This provides a
* serialization point at the end of the task processing. Must be restartable and idempotent.
*/
public interface TaskFinisher {

View File

@ -104,7 +104,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
@Override
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.splitLogZNode)) {
LOG.debug("tasks arrived or departed");
if (LOG.isTraceEnabled()) LOG.trace("tasks arrived or departed on " + path);
synchronized (taskReadyLock) {
taskReadySeq++;
taskReadyLock.notify();

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -392,9 +393,10 @@ public class AssignmentManager {
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
* @throws CoordinatedStateException
*/
void joinCluster() throws IOException,
KeeperException, InterruptedException {
void joinCluster()
throws IOException, KeeperException, InterruptedException, CoordinatedStateException {
long startTime = System.currentTimeMillis();
// Concurrency note: In the below the accesses on regionsInTransition are
// outside of a synchronization block where usually all accesses to RIT are
@ -410,8 +412,7 @@ public class AssignmentManager {
Set<ServerName> deadServers = rebuildUserRegions();
// This method will assign all user regions if a clean server startup or
// it will reconstruct master state and cleanup any leftovers from
// previous master process.
// it will reconstruct master state and cleanup any leftovers from previous master process.
boolean failover = processDeadServersAndRegionsInTransition(deadServers);
recoverTableInDisablingState();
@ -422,16 +423,17 @@ public class AssignmentManager {
/**
* Process all regions that are in transition in zookeeper and also
* processes the list of dead servers by scanning the META.
* processes the list of dead servers.
* Used by master joining an cluster. If we figure this is a clean cluster
* startup, will assign all user regions.
* @param deadServers
* Map of dead servers and their regions. Can be null.
* @param deadServers Set of servers that are offline probably legitimately that were carrying
* regions according to a scan of hbase:meta. Can be null.
* @throws IOException
* @throws InterruptedException
*/
boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
throws IOException, InterruptedException {
throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
// TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
boolean failover = !serverManager.getDeadServers().isEmpty();
if (failover) {
// This may not be a failover actually, especially if meta is on this master.
@ -1483,15 +1485,13 @@ public class AssignmentManager {
}
// Generate a round-robin bulk assignment plan
Map<ServerName, List<HRegionInfo>> bulkPlan
= balancer.roundRobinAssignment(regions, servers);
Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
if (bulkPlan == null) {
throw new IOException("Unable to determine a plan to assign region(s)");
}
processFavoredNodes(regions);
assign(regions.size(), servers.size(),
"round-robin=true", bulkPlan);
assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
}
private void assign(int regions, int totalServers,
@ -1607,10 +1607,8 @@ public class AssignmentManager {
/**
* Rebuild the list of user regions and assignment information.
* <p>
* Returns a set of servers that are not found to be online that hosted
* some regions.
* @return set of servers not online that hosted some regions per meta
* Updates regionstates with findings as we go through list of regions.
* @return set of servers not online that hosted some regions according to a scan of hbase:meta
* @throws IOException
*/
Set<ServerName> rebuildUserRegions() throws
@ -2058,15 +2056,15 @@ public class AssignmentManager {
}
/**
* Process shutdown server removing any assignments.
* Clean out crashed server removing any assignments.
* @param sn Server that went down.
* @return list of regions in transition on this server
*/
public List<HRegionInfo> processServerShutdown(final ServerName sn) {
public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
// Clean out any existing assignment plans for this server
synchronized (this.regionPlans) {
for (Iterator <Map.Entry<String, RegionPlan>> i =
this.regionPlans.entrySet().iterator(); i.hasNext();) {
for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
i.hasNext();) {
Map.Entry<String, RegionPlan> e = i.next();
ServerName otherSn = e.getValue().getDestination();
// The name will be null if the region is planned for a random assign.
@ -2084,8 +2082,7 @@ public class AssignmentManager {
// We need a lock on the region as we could update it
Lock lock = locker.acquireLock(encodedName);
try {
RegionState regionState =
regionStates.getRegionTransitionState(encodedName);
RegionState regionState = regionStates.getRegionTransitionState(encodedName);
if (regionState == null
|| (regionState.getServerName() != null && !regionState.isOnServer(sn))
|| !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,

View File

@ -38,6 +38,7 @@ import java.util.Set;
/**
* Class to hold dead servers list and utility querying dead server list.
* On znode expiration, servers are added here.
*/
@InterfaceAudience.Private
public class DeadServer {
@ -115,7 +116,7 @@ public class DeadServer {
}
public synchronized void finish(ServerName sn) {
LOG.debug("Finished processing " + sn);
if (LOG.isDebugEnabled()) LOG.debug("Finished " + sn + "; numProcessing=" + this.numProcessing);
this.numProcessing--;
}

View File

@ -265,7 +265,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
volatile boolean serviceStarted = false;
// flag set after we complete assignMeta.
private volatile boolean serverShutdownHandlerEnabled = false;
private volatile boolean serverCrashProcessingEnabled = false;
LoadBalancer balancer;
private BalancerChore balancerChore;
@ -669,11 +669,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// get a list for previously failed RS which need log splitting work
// we recover hbase:meta region servers inside master initialization and
// handle other failed servers in SSH in order to start up master node ASAP
Set<ServerName> previouslyFailedServers = this.fileSystemManager
.getFailedServersFromLogFolders();
// remove stale recovering regions from previous run
this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
Set<ServerName> previouslyFailedServers =
this.fileSystemManager.getFailedServersFromLogFolders();
// log splitting for hbase:meta server
ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
@ -707,14 +704,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// Check if master is shutting down because of some issue
// in initializing the regionserver or the balancer.
if(isStopped()) return;
if (isStopped()) return;
// Make sure meta assigned before proceeding.
status.setStatus("Assigning Meta Region");
assignMeta(status, previouslyFailedMetaRSs, HRegionInfo.DEFAULT_REPLICA_ID);
// check if master is shutting down because above assignMeta could return even hbase:meta isn't
// assigned when master is shutting down
if(isStopped()) return;
if (isStopped()) return;
// migrating existent table state from zk, so splitters
// and recovery process treat states properly.
@ -736,11 +733,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
status.setStatus("Starting assignment manager");
this.assignmentManager.joinCluster();
//set cluster status again after user regions are assigned
// set cluster status again after user regions are assigned
this.balancer.setClusterStatus(getClusterStatus());
// Start balancer and meta catalog janitor after meta and regions have
// been assigned.
// Start balancer and meta catalog janitor after meta and regions have been assigned.
status.setStatus("Starting balancer and catalog janitor");
this.clusterStatusChore = new ClusterStatusChore(this, balancer);
getChoreService().scheduleChore(clusterStatusChore);
@ -763,6 +759,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
status.markComplete("Initialization successful");
LOG.info("Master has completed initialization");
configurationManager.registerObserver(this.balancer);
// Set master as 'initialized'.
initialized = true;
// assign the meta replicas
Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
@ -910,7 +907,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// if the meta region server is died at this time, we need it to be re-assigned
// by SSH so that system tables can be assigned.
// No need to wait for meta is assigned = 0 when meta is just verified.
if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableServerShutdownHandler(assigned != 0);
if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0);
LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", location="
+ metaTableLocator.getMetaRegionLocation(this.getZooKeeper(), replicaId));
status.setStatus("META assigned.");
@ -946,15 +943,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
}
}
private void enableServerShutdownHandler(
final boolean waitForMeta) throws IOException, InterruptedException {
// If ServerShutdownHandler is disabled, we enable it and expire those dead
// but not expired servers. This is required so that if meta is assigning to
// a server which dies after assignMeta starts assignment,
// SSH can re-assign it. Otherwise, we will be
private void enableCrashedServerProcessing(final boolean waitForMeta)
throws IOException, InterruptedException {
// If crashed server processing is disabled, we enable it and expire those dead but not expired
// servers. This is required so that if meta is assigning to a server which dies after
// assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
// stuck here waiting forever if waitForMeta is specified.
if (!serverShutdownHandlerEnabled) {
serverShutdownHandlerEnabled = true;
if (!serverCrashProcessingEnabled) {
serverCrashProcessingEnabled = true;
this.serverManager.processQueuedDeadServers();
}
@ -2065,13 +2061,18 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
}
/**
* ServerShutdownHandlerEnabled is set false before completing
* assignMeta to prevent processing of ServerShutdownHandler.
* ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
* of crashed servers.
* @return true if assignMeta has completed;
*/
@Override
public boolean isServerShutdownHandlerEnabled() {
return this.serverShutdownHandlerEnabled;
public boolean isServerCrashProcessingEnabled() {
return this.serverCrashProcessingEnabled;
}
@VisibleForTesting
public void setServerCrashProcessingEnabled(final boolean b) {
this.serverCrashProcessingEnabled = b;
}
/**

View File

@ -59,6 +59,8 @@ import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.ipc.RemoteException;
import com.google.common.annotations.VisibleForTesting;
/**
* This class abstracts a bunch of operations the HMaster needs to interact with
* the underlying file system, including splitting log files, checking file
@ -132,6 +134,11 @@ public class MasterFileSystem {
this.distributedLogReplay = this.splitLogManager.isLogReplaying();
}
@VisibleForTesting
SplitLogManager getSplitLogManager() {
return this.splitLogManager;
}
/**
* Create initial layout in filesystem.
* <ol>

View File

@ -182,7 +182,7 @@ public interface MasterServices extends Server {
/**
* @return true if master enables ServerShutdownHandler;
*/
boolean isServerShutdownHandlerEnabled();
boolean isServerCrashProcessingEnabled();
/**
* Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint.

View File

@ -388,8 +388,7 @@ public class RegionStates {
return updateRegionState(hri, state, serverName, HConstants.NO_SEQNUM);
}
public void regionOnline(
final HRegionInfo hri, final ServerName serverName) {
public void regionOnline(final HRegionInfo hri, final ServerName serverName) {
regionOnline(hri, serverName, HConstants.NO_SEQNUM);
}
@ -398,16 +397,14 @@ public class RegionStates {
* We can't confirm it is really online on specified region server
* because it hasn't been put in region server's online region list yet.
*/
public void regionOnline(final HRegionInfo hri,
final ServerName serverName, long openSeqNum) {
public void regionOnline(final HRegionInfo hri, final ServerName serverName, long openSeqNum) {
String encodedName = hri.getEncodedName();
if (!serverManager.isServerOnline(serverName)) {
// This is possible if the region server dies before master gets a
// chance to handle ZK event in time. At this time, if the dead server
// is already processed by SSH, we should ignore this event.
// If not processed yet, ignore and let SSH deal with it.
LOG.warn("Ignored, " + encodedName
+ " was opened on a dead server: " + serverName);
LOG.warn("Ignored, " + encodedName + " was opened on a dead server: " + serverName);
return;
}
updateRegionState(hri, State.OPEN, serverName, openSeqNum);
@ -489,7 +486,7 @@ public class RegionStates {
}
long now = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("Adding to processed servers " + serverName);
LOG.debug("Adding to log splitting servers " + serverName);
}
processedServers.put(serverName, Long.valueOf(now));
Configuration conf = server.getConfiguration();
@ -503,7 +500,7 @@ public class RegionStates {
Map.Entry<ServerName, Long> e = it.next();
if (e.getValue().longValue() < cutoff) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removed from processed servers " + e.getKey());
LOG.debug("Removed from log splitting servers " + e.getKey());
}
it.remove();
}

View File

@ -52,8 +52,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -579,7 +578,7 @@ public class ServerManager {
}
return;
}
if (!services.isServerShutdownHandlerEnabled()) {
if (!services.isServerCrashProcessingEnabled()) {
LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
+ "delay expiring server " + serverName);
this.queuedDeadServers.add(serverName);
@ -591,18 +590,8 @@ public class ServerManager {
" but server shutdown already in progress");
return;
}
synchronized (onlineServers) {
if (!this.onlineServers.containsKey(serverName)) {
LOG.warn("Expiration of " + serverName + " but server not online");
}
// Remove the server from the known servers lists and update load info BUT
// add to deadservers first; do this so it'll show in dead servers list if
// not in online servers list.
this.deadservers.add(serverName);
this.onlineServers.remove(serverName);
onlineServers.notifyAll();
}
this.rsAdmins.remove(serverName);
moveFromOnelineToDeadServers(serverName);
// If cluster is going down, yes, servers are going to be expiring; don't
// process as a dead server
if (this.clusterShutdown) {
@ -615,13 +604,8 @@ public class ServerManager {
}
boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName);
if (carryingMeta) {
this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
this.services, this.deadservers, serverName));
} else {
this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
this.services, this.deadservers, serverName, true));
}
this.services.getMasterProcedureExecutor().
submitProcedure(new ServerCrashProcedure(serverName, true, carryingMeta));
LOG.debug("Added=" + serverName +
" to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
@ -633,8 +617,20 @@ public class ServerManager {
}
}
public synchronized void processDeadServer(final ServerName serverName) {
this.processDeadServer(serverName, false);
@VisibleForTesting
public void moveFromOnelineToDeadServers(final ServerName sn) {
synchronized (onlineServers) {
if (!this.onlineServers.containsKey(sn)) {
LOG.warn("Expiration of " + sn + " but server not online");
}
// Remove the server from the known servers lists and update load info BUT
// add to deadservers first; do this so it'll show in dead servers list if
// not in online servers list.
this.deadservers.add(sn);
this.onlineServers.remove(sn);
onlineServers.notifyAll();
}
this.rsAdmins.remove(sn);
}
public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
@ -652,9 +648,8 @@ public class ServerManager {
}
this.deadservers.add(serverName);
this.services.getExecutorService().submit(
new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
shouldSplitWal));
this.services.getMasterProcedureExecutor().
submitProcedure(new ServerCrashProcedure(serverName, shouldSplitWal, false));
}
/**
@ -662,7 +657,7 @@ public class ServerManager {
* called after HMaster#assignMeta and AssignmentManager#joinCluster.
* */
synchronized void processQueuedDeadServers() {
if (!services.isServerShutdownHandlerEnabled()) {
if (!services.isServerCrashProcessingEnabled()) {
LOG.info("Master hasn't enabled ServerShutdownHandler");
}
Iterator<ServerName> serverIterator = queuedDeadServers.iterator();

View File

@ -406,16 +406,15 @@ public class SplitLogManager {
// the function is only used in WALEdit direct replay mode
return;
}
if (serverNames == null || serverNames.isEmpty()) return;
Set<String> recoveredServerNameSet = new HashSet<String>();
if (serverNames != null) {
for (ServerName tmpServerName : serverNames) {
recoveredServerNameSet.add(tmpServerName.getServerName());
}
for (ServerName tmpServerName : serverNames) {
recoveredServerNameSet.add(tmpServerName.getServerName());
}
this.recoveringRegionLock.lock();
try {
this.recoveringRegionLock.lock();
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet,
isMetaRecovery);

View File

@ -25,17 +25,16 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.InterProcessLock;
import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
import org.apache.hadoop.hbase.InterProcessReadWriteLock;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;

View File

@ -1,88 +0,0 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices;
/**
* Handle logReplay work from SSH. Having a separate handler is not to block SSH in re-assigning
* regions from dead servers. Otherwise, available SSH handlers could be blocked by logReplay work
* (from {@link org.apache.hadoop.hbase.master.MasterFileSystem#splitLog(ServerName)}).
* During logReplay, if a receiving RS(say A) fails again, regions on A won't be able
* to be assigned to another live RS which causes the log replay unable to complete
* because WAL edits replay depends on receiving RS to be live
*/
@InterfaceAudience.Private
public class LogReplayHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(LogReplayHandler.class);
private final ServerName serverName;
protected final Server master;
protected final MasterServices services;
protected final DeadServer deadServers;
public LogReplayHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName) {
super(server, EventType.M_LOG_REPLAY);
this.master = server;
this.services = services;
this.deadServers = deadServers;
this.serverName = serverName;
this.deadServers.add(serverName);
}
@Override
public String toString() {
String name = serverName.toString();
return getClass().getSimpleName() + "-" + name + "-" + getSeqid();
}
@Override
public void process() throws IOException {
try {
if (this.master != null && this.master.isStopped()) {
// we're exiting ...
return;
}
this.services.getMasterFileSystem().splitLog(serverName);
} catch (Exception ex) {
if (ex instanceof IOException) {
// resubmit log replay work when failed
this.services.getExecutorService().submit((LogReplayHandler) this);
this.deadServers.add(serverName);
throw new IOException("failed log replay for " + serverName + ", will retry", ex);
} else {
throw new IOException(ex);
}
} finally {
this.deadServers.finish(serverName);
}
// logReplay is the last step of SSH so log a line to indicate that
LOG.info("Finished processing shutdown of " + serverName);
}
}

View File

@ -1,216 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Shutdown handler for the server hosting <code>hbase:meta</code>
*/
@InterfaceAudience.Private
public class MetaServerShutdownHandler extends ServerShutdownHandler {
private static final Log LOG = LogFactory.getLog(MetaServerShutdownHandler.class);
private AtomicInteger eventExceptionCount = new AtomicInteger(0);
@VisibleForTesting
static final int SHOW_STRACKTRACE_FREQUENCY = 100;
public MetaServerShutdownHandler(final Server server,
final MasterServices services,
final DeadServer deadServers, final ServerName serverName) {
super(server, services, deadServers, serverName,
EventType.M_META_SERVER_SHUTDOWN, true);
}
@Override
public void process() throws IOException {
boolean gotException = true;
try {
AssignmentManager am = this.services.getAssignmentManager();
this.services.getMasterFileSystem().setLogRecoveryMode();
boolean distributedLogReplay =
(this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
try {
if (this.shouldSplitWal) {
LOG.info("Splitting hbase:meta logs for " + serverName);
if (distributedLogReplay) {
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
this.services.getMasterFileSystem().prepareLogReplay(serverName, regions);
} else {
this.services.getMasterFileSystem().splitMetaLog(serverName);
}
am.getRegionStates().logSplit(HRegionInfo.FIRST_META_REGIONINFO);
}
} catch (IOException ioe) {
this.services.getExecutorService().submit(this);
this.deadServers.add(serverName);
throw new IOException("failed log splitting for " + serverName + ", will retry", ioe);
}
// Assign meta if we were carrying it.
// Check again: region may be assigned to other where because of RIT
// timeout
if (am.isCarryingMeta(serverName)) {
LOG.info("Server " + serverName + " was carrying META. Trying to assign.");
verifyAndAssignMetaWithRetries();
} else {
LOG.info("META has been assigned to otherwhere, skip assigning.");
}
try {
if (this.shouldSplitWal && distributedLogReplay) {
if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
regionAssignmentWaitTimeout)) {
// Wait here is to avoid log replay hits current dead server and incur a RPC timeout
// when replay happens before region assignment completes.
LOG.warn("Region " + HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()
+ " didn't complete assignment in time");
}
this.services.getMasterFileSystem().splitMetaLog(serverName);
}
} catch (Exception ex) {
if (ex instanceof IOException) {
this.services.getExecutorService().submit(this);
this.deadServers.add(serverName);
throw new IOException("failed log splitting for " + serverName + ", will retry", ex);
} else {
throw new IOException(ex);
}
}
gotException = false;
} finally {
if (gotException){
// If we had an exception, this.deadServers.finish will be skipped in super.process()
this.deadServers.finish(serverName);
}
}
super.process();
// Clear this counter on successful handling.
this.eventExceptionCount.set(0);
}
@Override
boolean isCarryingMeta() {
return true;
}
/**
* Before assign the hbase:meta region, ensure it haven't
* been assigned by other place
* <p>
* Under some scenarios, the hbase:meta region can be opened twice, so it seemed online
* in two regionserver at the same time.
* If the hbase:meta region has been assigned, so the operation can be canceled.
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
*/
private void verifyAndAssignMeta()
throws InterruptedException, IOException, KeeperException {
long timeout = this.server.getConfiguration().
getLong("hbase.catalog.verification.timeout", 1000);
if (!server.getMetaTableLocator().verifyMetaRegionLocation(server.getConnection(),
this.server.getZooKeeper(), timeout)) {
this.services.getAssignmentManager().assignMeta(HRegionInfo.FIRST_META_REGIONINFO);
} else if (serverName.equals(server.getMetaTableLocator().getMetaRegionLocation(
this.server.getZooKeeper()))) {
throw new IOException("hbase:meta is onlined on the dead server "
+ serverName);
} else {
LOG.info("Skip assigning hbase:meta, because it is online on the "
+ server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper()));
}
}
/**
* Failed many times, shutdown processing
* @throws IOException
*/
private void verifyAndAssignMetaWithRetries() throws IOException {
int iTimes = this.server.getConfiguration().getInt(
"hbase.catalog.verification.retries", 10);
long waitTime = this.server.getConfiguration().getLong(
"hbase.catalog.verification.timeout", 1000);
int iFlag = 0;
while (true) {
try {
verifyAndAssignMeta();
break;
} catch (KeeperException e) {
this.server.abort("In server shutdown processing, assigning meta", e);
throw new IOException("Aborting", e);
} catch (Exception e) {
if (iFlag >= iTimes) {
this.server.abort("verifyAndAssignMeta failed after" + iTimes
+ " times retries, aborting", e);
throw new IOException("Aborting", e);
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e1) {
LOG.warn("Interrupted when is the thread sleep", e1);
Thread.currentThread().interrupt();
throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
}
iFlag++;
}
}
}
@Override
protected void handleException(Throwable t) {
int count = eventExceptionCount.getAndIncrement();
if (count < 0) count = eventExceptionCount.getAndSet(0);
if (count > SHOW_STRACKTRACE_FREQUENCY) { // Too frequent, let's slow reporting
Threads.sleep(1000);
}
if (count % SHOW_STRACKTRACE_FREQUENCY == 0) {
LOG.error("Caught " + eventType + ", count=" + this.eventExceptionCount, t);
} else {
LOG.error("Caught " + eventType + ", count=" + this.eventExceptionCount +
"; " + t.getMessage() + "; stack trace shows every " + SHOW_STRACKTRACE_FREQUENCY +
"th time.");
}
}
}

View File

@ -1,371 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
/**
* Process server shutdown.
* Server-to-handle must be already in the deadservers lists. See
* {@link ServerManager#expireServer(ServerName)}
*/
@InterfaceAudience.Private
public class ServerShutdownHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class);
protected final ServerName serverName;
protected final MasterServices services;
protected final DeadServer deadServers;
protected final boolean shouldSplitWal; // whether to split WAL or not
protected final int regionAssignmentWaitTimeout;
public ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName,
final boolean shouldSplitWal) {
this(server, services, deadServers, serverName, EventType.M_SERVER_SHUTDOWN,
shouldSplitWal);
}
ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName, EventType type,
final boolean shouldSplitWal) {
super(server, type);
this.serverName = serverName;
this.server = server;
this.services = services;
this.deadServers = deadServers;
if (!this.deadServers.isDeadServer(this.serverName)) {
LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
}
this.shouldSplitWal = shouldSplitWal;
this.regionAssignmentWaitTimeout = server.getConfiguration().getInt(
HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000);
}
@Override
public String getInformativeName() {
if (serverName != null) {
return this.getClass().getSimpleName() + " for " + serverName;
} else {
return super.getInformativeName();
}
}
/**
* @return True if the server we are processing was carrying <code>hbase:meta</code>
*/
boolean isCarryingMeta() {
return false;
}
@Override
public String toString() {
return getClass().getSimpleName() + "-" + serverName + "-" + getSeqid();
}
@Override
public void process() throws IOException {
boolean hasLogReplayWork = false;
final ServerName serverName = this.serverName;
try {
// We don't want worker thread in the MetaServerShutdownHandler
// executor pool to block by waiting availability of hbase:meta
// Otherwise, it could run into the following issue:
// 1. The current MetaServerShutdownHandler instance For RS1 waits for the hbase:meta
// to come online.
// 2. The newly assigned hbase:meta region server RS2 was shutdown right after
// it opens the hbase:meta region. So the MetaServerShutdownHandler
// instance For RS1 will still be blocked.
// 3. The new instance of MetaServerShutdownHandler for RS2 is queued.
// 4. The newly assigned hbase:meta region server RS3 was shutdown right after
// it opens the hbase:meta region. So the MetaServerShutdownHandler
// instance For RS1 and RS2 will still be blocked.
// 5. The new instance of MetaServerShutdownHandler for RS3 is queued.
// 6. Repeat until we run out of MetaServerShutdownHandler worker threads
// The solution here is to resubmit a ServerShutdownHandler request to process
// user regions on that server so that MetaServerShutdownHandler
// executor pool is always available.
//
// If AssignmentManager hasn't finished rebuilding user regions,
// we are not ready to assign dead regions either. So we re-queue up
// the dead server for further processing too.
AssignmentManager am = services.getAssignmentManager();
ServerManager serverManager = services.getServerManager();
if (isCarryingMeta() /* hbase:meta */ || !am.isFailoverCleanupDone()) {
serverManager.processDeadServer(serverName, this.shouldSplitWal);
return;
}
// Wait on meta to come online; we need it to progress.
// TODO: Best way to hold strictly here? We should build this retry logic
// into the MetaTableAccessor operations themselves.
// TODO: Is the reading of hbase:meta necessary when the Master has state of
// cluster in its head? It should be possible to do without reading hbase:meta
// in all but one case. On split, the RS updates the hbase:meta
// table and THEN informs the master of the split via zk nodes in
// 'unassigned' dir. Currently the RS puts ephemeral nodes into zk so if
// the regionserver dies, these nodes do not stick around and this server
// shutdown processing does fixup (see the fixupDaughters method below).
// If we wanted to skip the hbase:meta scan, we'd have to change at least the
// final SPLIT message to be permanent in zk so in here we'd know a SPLIT
// completed (zk is updated after edits to hbase:meta have gone in). See
// {@link SplitTransaction}. We'd also have to be figure another way for
// doing the below hbase:meta daughters fixup.
Set<HRegionInfo> hris = null;
try {
server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
if (BaseLoadBalancer.tablesOnMaster(server.getConfiguration())) {
while (!this.server.isStopped() && serverManager.countOfRegionServers() < 2) {
// Wait till at least another regionserver is up besides the active master
// so that we don't assign all regions to the active master.
// This is best of efforts, because newly joined regionserver
// could crash right after that.
Thread.sleep(100);
}
}
hris = am.getRegionStates().getServerRegions(serverName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
if (this.server.isStopped()) {
throw new IOException("Server is stopped");
}
// delayed to set recovery mode based on configuration only after all outstanding splitlogtask
// drained
this.services.getMasterFileSystem().setLogRecoveryMode();
boolean distributedLogReplay =
(this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
try {
if (this.shouldSplitWal) {
if (distributedLogReplay) {
LOG.info("Mark regions in recovery for crashed server " + serverName +
" before assignment; regions=" + hris);
MasterFileSystem mfs = this.services.getMasterFileSystem();
mfs.prepareLogReplay(serverName, hris);
} else {
LOG.info("Splitting logs for " + serverName +
" before assignment; region count=" + (hris == null ? 0 : hris.size()));
this.services.getMasterFileSystem().splitLog(serverName);
}
am.getRegionStates().logSplit(serverName);
} else {
LOG.info("Skipping log splitting for " + serverName);
}
} catch (IOException ioe) {
resubmit(serverName, ioe);
}
List<HRegionInfo> toAssignRegions = new ArrayList<HRegionInfo>();
int replicaCount = services.getConfiguration().getInt(HConstants.META_REPLICAS_NUM,
HConstants.DEFAULT_META_REPLICA_NUM);
for (int i = 1; i < replicaCount; i++) {
HRegionInfo metaHri =
RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, i);
if (am.isCarryingMetaReplica(serverName, metaHri)) {
LOG.info("Reassigning meta replica" + metaHri + " that was on " + serverName);
toAssignRegions.add(metaHri);
}
}
// Clean out anything in regions in transition. Being conservative and
// doing after log splitting. Could do some states before -- OPENING?
// OFFLINE? -- and then others after like CLOSING that depend on log
// splitting.
List<HRegionInfo> regionsInTransition = am.processServerShutdown(serverName);
LOG.info("Reassigning " + ((hris == null)? 0: hris.size()) +
" region(s) that " + (serverName == null? "null": serverName) +
" was carrying (and " + regionsInTransition.size() +
" regions(s) that were opening on this server)");
toAssignRegions.addAll(regionsInTransition);
// Iterate regions that were on this server and assign them
if (hris != null && !hris.isEmpty()) {
RegionStates regionStates = am.getRegionStates();
for (HRegionInfo hri: hris) {
if (regionsInTransition.contains(hri)) {
continue;
}
String encodedName = hri.getEncodedName();
Lock lock = am.acquireRegionLock(encodedName);
try {
RegionState rit = regionStates.getRegionTransitionState(hri);
if (processDeadRegion(hri, am)) {
ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
// If this region is in transition on the dead server, it must be
// opening or pending_open, which should have been covered by AM#processServerShutdown
LOG.info("Skip assigning region " + hri.getRegionNameAsString()
+ " because it has been opened in " + addressFromAM.getServerName());
continue;
}
if (rit != null) {
if (rit.getServerName() != null && !rit.isOnServer(serverName)) {
// Skip regions that are in transition on other server
LOG.info("Skip assigning region in transition on other server" + rit);
continue;
}
LOG.info("Reassigning region with rs = " + rit);
regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
} else if (regionStates.isRegionInState(
hri, RegionState.State.SPLITTING_NEW, RegionState.State.MERGING_NEW)) {
regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
}
toAssignRegions.add(hri);
} else if (rit != null) {
if ((rit.isClosing() || rit.isFailedClose() || rit.isOffline())
&& am.getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLED, TableState.State.DISABLING) ||
am.getReplicasToClose().contains(hri)) {
// If the table was partially disabled and the RS went down, we should clear the RIT
// and remove the node for the region.
// The rit that we use may be stale in case the table was in DISABLING state
// but though we did assign we will not be clearing the znode in CLOSING state.
// Doing this will have no harm. See HBASE-5927
regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
am.offlineDisabledRegion(hri);
} else {
LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition "
+ rit + " not to be assigned by SSH of server " + serverName);
}
}
} finally {
lock.unlock();
}
}
}
try {
am.assign(toAssignRegions);
} catch (InterruptedException ie) {
LOG.error("Caught " + ie + " during round-robin assignment");
throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
} catch (IOException ioe) {
LOG.info("Caught " + ioe + " during region assignment, will retry");
// Only do wal splitting if shouldSplitWal and in DLR mode
serverManager.processDeadServer(serverName,
this.shouldSplitWal && distributedLogReplay);
return;
}
if (this.shouldSplitWal && distributedLogReplay) {
// wait for region assignment completes
for (HRegionInfo hri : toAssignRegions) {
try {
if (!am.waitOnRegionToClearRegionsInTransition(hri, regionAssignmentWaitTimeout)) {
// Wait here is to avoid log replay hits current dead server and incur a RPC timeout
// when replay happens before region assignment completes.
LOG.warn("Region " + hri.getEncodedName()
+ " didn't complete assignment in time");
}
} catch (InterruptedException ie) {
throw new InterruptedIOException("Caught " + ie
+ " during waitOnRegionToClearRegionsInTransition");
}
}
// submit logReplay work
this.services.getExecutorService().submit(
new LogReplayHandler(this.server, this.services, this.deadServers, this.serverName));
hasLogReplayWork = true;
}
} finally {
this.deadServers.finish(serverName);
}
if (!hasLogReplayWork) {
LOG.info("Finished processing of shutdown of " + serverName);
}
}
private void resubmit(final ServerName serverName, IOException ex) throws IOException {
// typecast to SSH so that we make sure that it is the SSH instance that
// gets submitted as opposed to MSSH or some other derived instance of SSH
this.services.getExecutorService().submit((ServerShutdownHandler) this);
this.deadServers.add(serverName);
throw new IOException("failed log splitting for " + serverName + ", will retry", ex);
}
/**
* Process a dead region from a dead RS. Checks if the region is disabled or
* disabling or if the region has a partially completed split.
* @param hri
* @param assignmentManager
* @return Returns true if specified region should be assigned, false if not.
* @throws IOException
*/
public static boolean processDeadRegion(HRegionInfo hri,
AssignmentManager assignmentManager)
throws IOException {
boolean tablePresent = assignmentManager.getTableStateManager().isTablePresent(hri.getTable());
if (!tablePresent) {
LOG.info("The table " + hri.getTable()
+ " was deleted. Hence not proceeding.");
return false;
}
// If table is not disabled but the region is offlined,
boolean disabled = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLED);
if (disabled){
LOG.info("The table " + hri.getTable()
+ " was disabled. Hence not proceeding.");
return false;
}
if (hri.isOffline() && hri.isSplit()) {
//HBASE-7721: Split parent and daughters are inserted into hbase:meta as an atomic operation.
//If the meta scanner saw the parent split, then it should see the daughters as assigned
//to the dead server. We don't have to do anything.
return false;
}
boolean disabling = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLING);
if (disabling) {
LOG.info("The table " + hri.getTable()
+ " is disabled. Hence not assigning region" + hri.getEncodedName());
return false;
}
return true;
}
}

View File

@ -184,14 +184,14 @@ public class AddColumnFamilyProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false;
return env.getProcedureQueue().tryAcquireTableWrite(
return env.getProcedureQueue().tryAcquireTableExclusiveLock(
tableName,
EventType.C_M_ADD_FAMILY.toString());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableWrite(tableName);
env.getProcedureQueue().releaseTableExclusiveLock(tableName);
}
@Override
@ -404,4 +404,4 @@ public class AddColumnFamilyProcedure
}
return regionInfoList;
}
}
}

View File

@ -265,12 +265,12 @@ public class CreateTableProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
return env.getProcedureQueue().tryAcquireTableWrite(getTableName(), "create table");
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table");
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableWrite(getTableName());
env.getProcedureQueue().releaseTableExclusiveLock(getTableName());
}
private boolean prepareCreate(final MasterProcedureEnv env) throws IOException {
@ -444,4 +444,4 @@ public class CreateTableProcedure
final TableName tableName) throws IOException {
env.getMasterServices().getTableDescriptors().get(tableName);
}
}
}

View File

@ -200,14 +200,14 @@ public class DeleteColumnFamilyProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false;
return env.getProcedureQueue().tryAcquireTableWrite(
return env.getProcedureQueue().tryAcquireTableExclusiveLock(
tableName,
EventType.C_M_DELETE_FAMILY.toString());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableWrite(tableName);
env.getProcedureQueue().releaseTableExclusiveLock(tableName);
}
@Override
@ -436,4 +436,4 @@ public class DeleteColumnFamilyProcedure
}
return regionInfoList;
}
}
}

View File

@ -200,12 +200,12 @@ public class DeleteTableProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false;
return env.getProcedureQueue().tryAcquireTableWrite(getTableName(), "delete table");
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table");
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableWrite(getTableName());
env.getProcedureQueue().releaseTableExclusiveLock(getTableName());
}
@Override
@ -407,4 +407,4 @@ public class DeleteTableProcedure
throws IOException {
ProcedureSyncWait.getMasterQuotaManager(env).removeTableFromNamespaceQuota(tableName);
}
}
}

View File

@ -214,14 +214,14 @@ public class DisableTableProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false;
return env.getProcedureQueue().tryAcquireTableWrite(
return env.getProcedureQueue().tryAcquireTableExclusiveLock(
tableName,
EventType.C_M_DISABLE_TABLE.toString());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableWrite(tableName);
env.getProcedureQueue().releaseTableExclusiveLock(tableName);
}
@Override
@ -537,4 +537,4 @@ public class DisableTableProcedure
return regions != null && regions.isEmpty();
}
}
}
}

View File

@ -93,9 +93,9 @@ public class EnableTableProcedure
/**
* Constructor
* @param env MasterProcedureEnv
* @throws IOException
* @param tableName the table to operate on
* @param skipTableStateCheck whether to check table state
* @throws IOException
*/
public EnableTableProcedure(
final MasterProcedureEnv env,
@ -234,14 +234,14 @@ public class EnableTableProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false;
return env.getProcedureQueue().tryAcquireTableWrite(
return env.getProcedureQueue().tryAcquireTableExclusiveLock(
tableName,
EventType.C_M_ENABLE_TABLE.toString());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableWrite(tableName);
env.getProcedureQueue().releaseTableExclusiveLock(tableName);
}
@Override

View File

@ -120,4 +120,4 @@ public class MasterProcedureEnv {
public boolean isInitialized() {
return master.isInitialized();
}
}
}

View File

@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableNotFoundException;
@ -43,11 +44,12 @@ import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOpe
* ProcedureRunnableSet for the Master Procedures.
* This RunnableSet tries to provide to the ProcedureExecutor procedures
* that can be executed without having to wait on a lock.
* Most of the master operations can be executed concurrently, if the they
* Most of the master operations can be executed concurrently, if they
* are operating on different tables (e.g. two create table can be performed
* at the same, time assuming table A and table B).
* at the same, time assuming table A and table B) or against two different servers; say
* two servers that crashed at about the same time.
*
* Each procedure should implement an interface providing information for this queue.
* <p>Each procedure should implement an interface providing information for this queue.
* for example table related procedures should implement TableProcedureInterface.
* each procedure will be pushed in its own queue, and based on the operation type
* we may take smarter decision. e.g. we can abort all the operations preceding
@ -58,7 +60,18 @@ import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOpe
public class MasterProcedureQueue implements ProcedureRunnableSet {
private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class);
private final ProcedureFairRunQueues<TableName, RunQueue> fairq;
// Two queues to ensure that server procedures run ahead of table precedures always.
private final ProcedureFairRunQueues<TableName, RunQueue> tableFairQ;
/**
* Rely on basic fair q. ServerCrashProcedure will yield if meta is not assigned. This way, the
* server that was carrying meta should rise to the top of the queue (this is how it used to
* work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers
* that were carrying system tables on crash; do I need to have these servers have priority?
*
* <p>Apart from the special-casing of meta and system tables, fairq is what we want
*/
private final ProcedureFairRunQueues<ServerName, RunQueue> serverFairQ;
private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition();
private final TableLockManager lockManager;
@ -66,11 +79,16 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
private final int metaTablePriority;
private final int userTablePriority;
private final int sysTablePriority;
private static final int DEFAULT_SERVER_PRIORITY = 1;
/**
* Keeps count across server and table queues.
*/
private int queueSize;
public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) {
this.fairq = new ProcedureFairRunQueues<TableName, RunQueue>(1);
this.tableFairQ = new ProcedureFairRunQueues<TableName, RunQueue>(1);
this.serverFairQ = new ProcedureFairRunQueues<ServerName, RunQueue>(1);
this.lockManager = lockManager;
// TODO: should this be part of the HTD?
@ -105,12 +123,13 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
@Override
public void yield(final Procedure proc) {
addFront(proc);
addBack(proc);
}
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public Long poll() {
Long pollResult = null;
lock.lock();
try {
if (queueSize == 0) {
@ -119,19 +138,25 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
return null;
}
}
RunQueue queue = fairq.poll();
if (queue != null && queue.isAvailable()) {
queueSize--;
return queue.poll();
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
pollResult = doPoll(serverFairQ.poll());
if (pollResult == null) {
pollResult = doPoll(tableFairQ.poll());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
lock.unlock();
}
return null;
return pollResult;
}
private Long doPoll(final RunQueue rq) {
if (rq == null || !rq.isAvailable()) return null;
this.queueSize--;
return rq.poll();
}
@Override
@ -148,7 +173,8 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
public void clear() {
lock.lock();
try {
fairq.clear();
serverFairQ.clear();
tableFairQ.clear();
queueSize = 0;
} finally {
lock.unlock();
@ -169,7 +195,8 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
public String toString() {
lock.lock();
try {
return "MasterProcedureQueue size=" + queueSize + ": " + fairq;
return "MasterProcedureQueue size=" + queueSize + ": tableFairQ: " + tableFairQ +
", serverFairQ: " + serverFairQ;
} finally {
lock.unlock();
}
@ -197,6 +224,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
markTableAsDeleted(iProcTable.getTableName());
}
}
// No cleanup for ServerProcedureInterface types, yet.
}
private RunQueue getRunQueueOrCreate(final Procedure proc) {
@ -204,17 +232,26 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
final TableName table = ((TableProcedureInterface)proc).getTableName();
return getRunQueueOrCreate(table);
}
// TODO: at the moment we only have Table procedures
// if you are implementing a non-table procedure, you have two option create
// a group for all the non-table procedures or try to find a key for your
// non-table procedure and implement something similar to the TableRunQueue.
if (proc instanceof ServerProcedureInterface) {
return getRunQueueOrCreate((ServerProcedureInterface)proc);
}
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
// a group for all the non-table/non-server procedures or try to find a key for your
// non-table/non-server procedures and implement something similar to the TableRunQueue.
throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet");
}
private TableRunQueue getRunQueueOrCreate(final TableName table) {
final TableRunQueue queue = getRunQueue(table);
if (queue != null) return queue;
return (TableRunQueue)fairq.add(table, createTableRunQueue(table));
return (TableRunQueue)tableFairQ.add(table, createTableRunQueue(table));
}
private ServerRunQueue getRunQueueOrCreate(final ServerProcedureInterface spi) {
final ServerRunQueue queue = getRunQueue(spi.getServerName());
if (queue != null) return queue;
return (ServerRunQueue)serverFairQ.add(spi.getServerName(), createServerRunQueue(spi));
}
private TableRunQueue createTableRunQueue(final TableName table) {
@ -227,8 +264,35 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
return new TableRunQueue(priority);
}
private ServerRunQueue createServerRunQueue(final ServerProcedureInterface spi) {
return new ServerRunQueue(DEFAULT_SERVER_PRIORITY);
}
private TableRunQueue getRunQueue(final TableName table) {
return (TableRunQueue)fairq.get(table);
return (TableRunQueue)tableFairQ.get(table);
}
private ServerRunQueue getRunQueue(final ServerName sn) {
return (ServerRunQueue)serverFairQ.get(sn);
}
/**
* Try to acquire the write lock on the specified table.
* other operations in the table-queue will be executed after the lock is released.
* @param table Table to lock
* @param purpose Human readable reason for locking the table
* @return true if we were able to acquire the lock on the table, otherwise false.
*/
public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
return getRunQueueOrCreate(table).tryExclusiveLock(lockManager, table, purpose);
}
/**
* Release the write lock taken with tryAcquireTableWrite()
* @param table the name of the table that has the write lock
*/
public void releaseTableExclusiveLock(final TableName table) {
getRunQueue(table).releaseExclusiveLock(lockManager, table);
}
/**
@ -239,35 +303,54 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
* @param purpose Human readable reason for locking the table
* @return true if we were able to acquire the lock on the table, otherwise false.
*/
public boolean tryAcquireTableRead(final TableName table, final String purpose) {
return getRunQueueOrCreate(table).tryRead(lockManager, table, purpose);
public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
return getRunQueueOrCreate(table).trySharedLock(lockManager, table, purpose);
}
/**
* Release the read lock taken with tryAcquireTableRead()
* @param table the name of the table that has the read lock
*/
public void releaseTableRead(final TableName table) {
getRunQueue(table).releaseRead(lockManager, table);
public void releaseTableSharedLock(final TableName table) {
getRunQueue(table).releaseSharedLock(lockManager, table);
}
/**
* Try to acquire the write lock on the specified table.
* other operations in the table-queue will be executed after the lock is released.
* @param table Table to lock
* @param purpose Human readable reason for locking the table
* @return true if we were able to acquire the lock on the table, otherwise false.
* Try to acquire the write lock on the specified server.
* @see #releaseServerExclusiveLock(ServerProcedureInterface)
* @param spi Server to lock
* @return true if we were able to acquire the lock on the server, otherwise false.
*/
public boolean tryAcquireTableWrite(final TableName table, final String purpose) {
return getRunQueueOrCreate(table).tryWrite(lockManager, table, purpose);
public boolean tryAcquireServerExclusiveLock(final ServerProcedureInterface spi) {
return getRunQueueOrCreate(spi).tryExclusiveLock();
}
/**
* Release the write lock taken with tryAcquireTableWrite()
* @param table the name of the table that has the write lock
* Release the write lock
* @see #tryAcquireServerExclusiveLock(ServerProcedureInterface)
* @param spi the server that has the write lock
*/
public void releaseTableWrite(final TableName table) {
getRunQueue(table).releaseWrite(lockManager, table);
public void releaseServerExclusiveLock(final ServerProcedureInterface spi) {
getRunQueue(spi.getServerName()).releaseExclusiveLock();
}
/**
* Try to acquire the read lock on the specified server.
* @see #releaseServerSharedLock(ServerProcedureInterface)
* @param spi Server to lock
* @return true if we were able to acquire the lock on the server, otherwise false.
*/
public boolean tryAcquireServerSharedLock(final ServerProcedureInterface spi) {
return getRunQueueOrCreate(spi).trySharedLock();
}
/**
* Release the read lock taken
* @see #tryAcquireServerSharedLock(ServerProcedureInterface)
* @param spi the server that has the read lock
*/
public void releaseServerSharedLock(final ServerProcedureInterface spi) {
getRunQueue(spi.getServerName()).releaseSharedLock();
}
/**
@ -284,7 +367,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
lock.lock();
try {
if (queue.isEmpty() && !queue.isLocked()) {
fairq.remove(table);
tableFairQ.remove(table);
// Remove the table lock
try {
@ -310,115 +393,168 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
boolean isLocked();
}
/**
* Base abstract class for RunQueue implementations.
* Be careful honoring synchronizations in subclasses. In here we protect access but if you are
* acting on a state found in here, be sure dependent code keeps synchronization.
* Implements basic in-memory read/write locking mechanism to prevent procedure steps being run
* in parallel.
*/
private static abstract class AbstractRunQueue implements RunQueue {
// All modification of runnables happens with #lock held.
private final Deque<Long> runnables = new ArrayDeque<Long>();
private final int priority;
private boolean exclusiveLock = false;
private int sharedLock = 0;
public AbstractRunQueue(int priority) {
this.priority = priority;
}
boolean isEmpty() {
return this.runnables.isEmpty();
}
@Override
public boolean isAvailable() {
synchronized (this) {
return !exclusiveLock && !runnables.isEmpty();
}
}
@Override
public int getPriority() {
return this.priority;
}
@Override
public void addFront(Procedure proc) {
this.runnables.addFirst(proc.getProcId());
}
@Override
public void addBack(Procedure proc) {
this.runnables.addLast(proc.getProcId());
}
@Override
public Long poll() {
return this.runnables.poll();
}
@Override
public synchronized boolean isLocked() {
return isExclusiveLock() || sharedLock > 0;
}
public synchronized boolean isExclusiveLock() {
return this.exclusiveLock;
}
public synchronized boolean trySharedLock() {
if (isExclusiveLock()) return false;
sharedLock++;
return true;
}
public synchronized void releaseSharedLock() {
sharedLock--;
}
/**
* @return True if only one instance of a shared lock outstanding.
*/
synchronized boolean isSingleSharedLock() {
return sharedLock == 1;
}
public synchronized boolean tryExclusiveLock() {
if (isLocked()) return false;
exclusiveLock = true;
return true;
}
public synchronized void releaseExclusiveLock() {
exclusiveLock = false;
}
@Override
public String toString() {
return this.runnables.toString();
}
}
/**
* Run Queue for Server procedures.
*/
private static class ServerRunQueue extends AbstractRunQueue {
public ServerRunQueue(int priority) {
super(priority);
}
}
/**
* Run Queue for a Table. It contains a read-write lock that is used by the
* MasterProcedureQueue to decide if we should fetch an item from this queue
* or skip to another one which will be able to run without waiting for locks.
*/
private static class TableRunQueue implements RunQueue {
private final Deque<Long> runnables = new ArrayDeque<Long>();
private final int priority;
private static class TableRunQueue extends AbstractRunQueue {
private TableLock tableLock = null;
private boolean wlock = false;
private int rlock = 0;
public TableRunQueue(int priority) {
this.priority = priority;
}
@Override
public void addFront(final Procedure proc) {
runnables.addFirst(proc.getProcId());
super(priority);
}
// TODO: Improve run-queue push with TableProcedureInterface.getType()
// we can take smart decisions based on the type of the operation (e.g. create/delete)
@Override
public void addBack(final Procedure proc) {
runnables.addLast(proc.getProcId());
super.addBack(proc);
}
@Override
public Long poll() {
return runnables.poll();
}
@Override
public boolean isAvailable() {
synchronized (this) {
return !wlock && !runnables.isEmpty();
}
}
public boolean isEmpty() {
return runnables.isEmpty();
}
@Override
public boolean isLocked() {
synchronized (this) {
return wlock || rlock > 0;
}
}
public boolean tryRead(final TableLockManager lockManager,
public synchronized boolean trySharedLock(final TableLockManager lockManager,
final TableName tableName, final String purpose) {
synchronized (this) {
if (wlock) {
return false;
}
if (isExclusiveLock()) return false;
// Take zk-read-lock
tableLock = lockManager.readLock(tableName, purpose);
try {
tableLock.acquire();
} catch (IOException e) {
LOG.error("failed acquire read lock on " + tableName, e);
tableLock = null;
return false;
}
rlock++;
// Take zk-read-lock
tableLock = lockManager.readLock(tableName, purpose);
try {
tableLock.acquire();
} catch (IOException e) {
LOG.error("failed acquire read lock on " + tableName, e);
tableLock = null;
return false;
}
trySharedLock();
return true;
}
public void releaseRead(final TableLockManager lockManager,
public synchronized void releaseSharedLock(final TableLockManager lockManager,
final TableName tableName) {
synchronized (this) {
releaseTableLock(lockManager, rlock == 1);
rlock--;
}
releaseTableLock(lockManager, isSingleSharedLock());
releaseSharedLock();
}
public boolean tryWrite(final TableLockManager lockManager,
public synchronized boolean tryExclusiveLock(final TableLockManager lockManager,
final TableName tableName, final String purpose) {
synchronized (this) {
if (wlock || rlock > 0) {
return false;
}
// Take zk-write-lock
tableLock = lockManager.writeLock(tableName, purpose);
try {
tableLock.acquire();
} catch (IOException e) {
LOG.error("failed acquire write lock on " + tableName, e);
tableLock = null;
return false;
}
wlock = true;
if (isLocked()) return false;
// Take zk-write-lock
tableLock = lockManager.writeLock(tableName, purpose);
try {
tableLock.acquire();
} catch (IOException e) {
LOG.error("failed acquire write lock on " + tableName, e);
tableLock = null;
return false;
}
tryExclusiveLock();
return true;
}
public void releaseWrite(final TableLockManager lockManager,
public synchronized void releaseExclusiveLock(final TableLockManager lockManager,
final TableName tableName) {
synchronized (this) {
releaseTableLock(lockManager, true);
wlock = false;
}
releaseTableLock(lockManager, true);
releaseExclusiveLock();
}
private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
@ -434,15 +570,5 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
}
}
}
@Override
public int getPriority() {
return priority;
}
@Override
public String toString() {
return runnables.toString();
}
}
}
}

View File

@ -182,14 +182,14 @@ public class ModifyColumnFamilyProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false;
return env.getProcedureQueue().tryAcquireTableWrite(
return env.getProcedureQueue().tryAcquireTableExclusiveLock(
tableName,
EventType.C_M_MODIFY_FAMILY.toString());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableWrite(tableName);
env.getProcedureQueue().releaseTableExclusiveLock(tableName);
}
@Override
@ -379,4 +379,4 @@ public class ModifyColumnFamilyProcedure
});
}
}
}
}

View File

@ -214,14 +214,14 @@ public class ModifyTableProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false;
return env.getProcedureQueue().tryAcquireTableWrite(
return env.getProcedureQueue().tryAcquireTableExclusiveLock(
getTableName(),
EventType.C_M_MODIFY_TABLE.toString());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableWrite(getTableName());
env.getProcedureQueue().releaseTableExclusiveLock(getTableName());
}
@Override
@ -507,4 +507,4 @@ public class ModifyTableProcedure
}
return regionInfoList;
}
}
}

View File

@ -0,0 +1,751 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ServerCrashState;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
/**
* Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called
* ServerShutdownHandler.
*
* <p>The procedure flow varies dependent on whether meta is assigned, if we are
* doing distributed log replay versus distributed log splitting, and if we are to split logs at
* all.
*
* <p>This procedure asks that all crashed servers get processed equally; we yield after the
* completion of each successful flow step. We do this so that we do not 'deadlock' waiting on
* a region assignment so we can replay edits which could happen if a region moved there are edits
* on two servers for replay.
*
* <p>TODO: ASSIGN and WAIT_ON_ASSIGN (at least) are not idempotent. Revisit when assign is pv2.
* TODO: We do not have special handling for system tables.
*/
public class ServerCrashProcedure
extends StateMachineProcedure<MasterProcedureEnv, ServerCrashState>
implements ServerProcedureInterface {
private static final Log LOG = LogFactory.getLog(ServerCrashProcedure.class);
/**
* Configuration key to set how long to wait in ms doing a quick check on meta state.
*/
public static final String KEY_SHORT_WAIT_ON_META =
"hbase.master.servercrash.short.wait.on.meta.ms";
public static final int DEFAULT_SHORT_WAIT_ON_META = 1000;
/**
* Configuration key to set how many retries to cycle before we give up on meta.
* Each attempt will wait at least {@link #KEY_SHORT_WAIT_ON_META} milliseconds.
*/
public static final String KEY_RETRIES_ON_META =
"hbase.master.servercrash.meta.retries";
public static final int DEFAULT_RETRIES_ON_META = 10;
/**
* Configuration key to set how long to wait in ms on regions in transition.
*/
public static final String KEY_WAIT_ON_RIT =
"hbase.master.servercrash.wait.on.rit.ms";
public static final int DEFAULT_WAIT_ON_RIT = 30000;
private static final Set<HRegionInfo> META_REGION_SET = new HashSet<HRegionInfo>();
static {
META_REGION_SET.add(HRegionInfo.FIRST_META_REGIONINFO);
}
/**
* Name of the crashed server to process.
*/
private ServerName serverName;
/**
* Regions that were on the crashed server.
*/
private Set<HRegionInfo> regionsOnCrashedServer;
/**
* Regions to assign. Usually some subset of {@link #regionsOnCrashedServer}
*/
private List<HRegionInfo> regionsToAssign;
private boolean distributedLogReplay = false;
private boolean carryingMeta = false;
private boolean shouldSplitWal;
/**
* Cycles on same state. Good for figuring if we are stuck.
*/
private int cycles = 0;
/**
* Ordinal of the previous state. So we can tell if we are progressing or not. TODO: if useful,
* move this back up into StateMachineProcedure
*/
private int previousState;
/**
* Call this constructor queuing up a Procedure.
* @param serverName Name of the crashed server.
* @param shouldSplitWal True if we should split WALs as part of crashed server processing.
* @param carryingMeta True if carrying hbase:meta table region.
*/
public ServerCrashProcedure(final ServerName serverName,
final boolean shouldSplitWal, final boolean carryingMeta) {
this.serverName = serverName;
this.shouldSplitWal = shouldSplitWal;
this.carryingMeta = carryingMeta;
// Currently not used.
}
/**
* Used when deserializing from a procedure store; we'll construct one of these then call
* {@link #deserializeStateData(InputStream)}. Do not use directly.
*/
public ServerCrashProcedure() {
super();
}
private void throwProcedureYieldException(final String msg) throws ProcedureYieldException {
String logMsg = msg + "; cycle=" + this.cycles + ", running for " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), getStartTime());
// The procedure executor logs ProcedureYieldException at trace level. For now, log these
// yields for server crash processing at DEBUG. Revisit when stable.
if (LOG.isDebugEnabled()) LOG.debug(logMsg);
throw new ProcedureYieldException(logMsg);
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
throws ProcedureYieldException {
if (LOG.isTraceEnabled()) {
LOG.trace(state);
}
// Keep running count of cycles
if (state.ordinal() != this.previousState) {
this.previousState = state.ordinal();
this.cycles = 0;
} else {
this.cycles++;
}
MasterServices services = env.getMasterServices();
try {
switch (state) {
case SERVER_CRASH_START:
// Is master fully online? If not, yield. No processing of servers unless master is up
if (!services.getAssignmentManager().isFailoverCleanupDone()) {
throwProcedureYieldException("Waiting on master failover to complete");
}
LOG.info("Start processing crashed " + this.serverName);
start(env);
// If carrying meta, process it first. Else, get list of regions on crashed server.
if (this.carryingMeta) setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
else setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
break;
case SERVER_CRASH_GET_REGIONS:
// If hbase:meta is not assigned, yield.
if (!isMetaAssignedQuickTest(env)) {
throwProcedureYieldException("Waiting on hbase:meta assignment");
}
this.regionsOnCrashedServer =
services.getAssignmentManager().getRegionStates().getServerRegions(this.serverName);
// Where to go next? Depends on whether we should split logs at all or if we should do
// distributed log splitting (DLS) vs distributed log replay (DLR).
if (!this.shouldSplitWal) {
setNextState(ServerCrashState.SERVER_CRASH_CALC_REGIONS_TO_ASSIGN);
} else if (this.distributedLogReplay) {
setNextState(ServerCrashState.SERVER_CRASH_PREPARE_LOG_REPLAY);
} else {
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
}
break;
case SERVER_CRASH_PROCESS_META:
// If we fail processing hbase:meta, yield.
if (!processMeta(env)) {
throwProcedureYieldException("Waiting on regions-in-transition to clear");
}
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
break;
case SERVER_CRASH_PREPARE_LOG_REPLAY:
prepareLogReplay(env, this.regionsOnCrashedServer);
setNextState(ServerCrashState.SERVER_CRASH_CALC_REGIONS_TO_ASSIGN);
break;
case SERVER_CRASH_SPLIT_LOGS:
splitLogs(env);
// If DLR, go to FINISH. Otherwise, if DLS, go to SERVER_CRASH_CALC_REGIONS_TO_ASSIGN
if (this.distributedLogReplay) setNextState(ServerCrashState.SERVER_CRASH_FINISH);
else setNextState(ServerCrashState.SERVER_CRASH_CALC_REGIONS_TO_ASSIGN);
break;
case SERVER_CRASH_CALC_REGIONS_TO_ASSIGN:
this.regionsToAssign = calcRegionsToAssign(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
break;
case SERVER_CRASH_ASSIGN:
// Assign may not be idempotent. SSH used to requeue the SSH if we got an IOE assigning
// which is what we are mimicing here but it looks prone to double assignment if assign
// fails midway. TODO: Test.
// If no regions to assign, skip assign and skip to the finish.
boolean regions = this.regionsToAssign != null && !this.regionsToAssign.isEmpty();
if (regions) {
if (!assign(env, this.regionsToAssign)) {
throwProcedureYieldException("Failed assign; will retry");
}
}
if (regions && this.shouldSplitWal && distributedLogReplay) {
setNextState(ServerCrashState.SERVER_CRASH_WAIT_ON_ASSIGN);
} else {
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
}
break;
case SERVER_CRASH_WAIT_ON_ASSIGN:
// TODO: The list of regionsToAssign may be more than we actually assigned. See down in
// AM #1629 around 'if (regionStates.wasRegionOnDeadServer(encodedName)) {' where where we
// will skip assigning a region because it is/was on a dead server. Should never happen!
// It was on this server. Worst comes to worst, we'll still wait here till other server is
// processed.
// If the wait on assign failed, yield -- if we have regions to assign.
if (this.regionsToAssign != null && !this.regionsToAssign.isEmpty()) {
if (!waitOnAssign(env, this.regionsToAssign)) {
throwProcedureYieldException("Waiting on region assign");
}
}
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
break;
case SERVER_CRASH_FINISH:
LOG.info("Finished processing of crashed " + serverName);
services.getServerManager().getDeadServers().finish(serverName);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
LOG.warn("Failed serverName=" + this.serverName + ", state=" + state + "; retry", e);
} catch (InterruptedException e) {
// TODO: Make executor allow IEs coming up out of execute.
LOG.warn("Interrupted serverName=" + this.serverName + ", state=" + state + "; retry", e);
Thread.currentThread().interrupt();
}
return Flow.HAS_MORE_STATE;
}
/**
* Start processing of crashed server. In here we'll just set configs. and return.
* @param env
* @throws IOException
*/
private void start(final MasterProcedureEnv env) throws IOException {
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
// Set recovery mode late. This is what the old ServerShutdownHandler used do.
mfs.setLogRecoveryMode();
this.distributedLogReplay = mfs.getLogRecoveryMode() == RecoveryMode.LOG_REPLAY;
}
/**
* @param env
* @return False if we fail to assign and split logs on meta ('process').
* @throws IOException
* @throws InterruptedException
*/
private boolean processMeta(final MasterProcedureEnv env)
throws IOException {
if (LOG.isDebugEnabled()) LOG.debug("Processing hbase:meta that was on " + this.serverName);
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
HRegionInfo metaHRI = HRegionInfo.FIRST_META_REGIONINFO;
if (this.shouldSplitWal) {
if (this.distributedLogReplay) {
prepareLogReplay(env, META_REGION_SET);
} else {
// TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
mfs.splitMetaLog(serverName);
am.getRegionStates().logSplit(metaHRI);
}
}
// Assign meta if still carrying it. Check again: region may be assigned because of RIT timeout
boolean processed = true;
if (am.isCarryingMeta(serverName)) {
// TODO: May block here if hard time figuring state of meta.
am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
verifyAndAssignMetaWithRetries(env);
if (this.shouldSplitWal && distributedLogReplay) {
int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT);
if (!waitOnRegionToClearRegionsInTransition(am, metaHRI, timeout)) {
processed = false;
} else {
// TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
mfs.splitMetaLog(serverName);
}
}
}
return processed;
}
/**
* @return True if region cleared RIT, else false if we timed out waiting.
* @throws InterruptedIOException
*/
private boolean waitOnRegionToClearRegionsInTransition(AssignmentManager am,
final HRegionInfo hri, final int timeout)
throws InterruptedIOException {
try {
if (!am.waitOnRegionToClearRegionsInTransition(hri, timeout)) {
// Wait here is to avoid log replay hits current dead server and incur a RPC timeout
// when replay happens before region assignment completes.
LOG.warn("Region " + hri.getEncodedName() + " didn't complete assignment in time");
return false;
}
} catch (InterruptedException ie) {
throw new InterruptedIOException("Caught " + ie +
" during waitOnRegionToClearRegionsInTransition for " + hri);
}
return true;
}
private void prepareLogReplay(final MasterProcedureEnv env, final Set<HRegionInfo> regions)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Mark " + size(this.regionsOnCrashedServer) +
" regions-in-recovery from " + this.serverName);
}
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
mfs.prepareLogReplay(this.serverName, regions);
am.getRegionStates().logSplit(this.serverName);
}
private void splitLogs(final MasterProcedureEnv env) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting logs from " + serverName + "; region count=" +
size(this.regionsOnCrashedServer));
}
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
// TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running.
mfs.splitLog(this.serverName);
am.getRegionStates().logSplit(this.serverName);
}
static int size(final Collection<HRegionInfo> hris) {
return hris == null? 0: hris.size();
}
/**
* Figure out what we need to assign. Should be idempotent.
* @param env
* @return List of calculated regions to assign; may be empty or null.
* @throws IOException
*/
private List<HRegionInfo> calcRegionsToAssign(final MasterProcedureEnv env)
throws IOException {
AssignmentManager am = env.getMasterServices().getAssignmentManager();
List<HRegionInfo> regionsToAssignAggregator = new ArrayList<HRegionInfo>();
int replicaCount = env.getMasterConfiguration().getInt(HConstants.META_REPLICAS_NUM,
HConstants.DEFAULT_META_REPLICA_NUM);
for (int i = 1; i < replicaCount; i++) {
HRegionInfo metaHri =
RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, i);
if (am.isCarryingMetaReplica(this.serverName, metaHri)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reassigning meta replica" + metaHri + " that was on " + this.serverName);
}
regionsToAssignAggregator.add(metaHri);
}
}
// Clean out anything in regions in transition.
List<HRegionInfo> regionsInTransition = am.cleanOutCrashedServerReferences(serverName);
if (LOG.isDebugEnabled()) {
LOG.debug("Reassigning " + size(this.regionsOnCrashedServer) +
" region(s) that " + (serverName == null? "null": serverName) +
" was carrying (and " + regionsInTransition.size() +
" regions(s) that were opening on this server)");
}
regionsToAssignAggregator.addAll(regionsInTransition);
// Iterate regions that were on this server and figure which of these we need to reassign
if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) {
RegionStates regionStates = am.getRegionStates();
for (HRegionInfo hri: this.regionsOnCrashedServer) {
if (regionsInTransition.contains(hri)) continue;
String encodedName = hri.getEncodedName();
Lock lock = am.acquireRegionLock(encodedName);
try {
RegionState rit = regionStates.getRegionTransitionState(hri);
if (processDeadRegion(hri, am)) {
ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
// If this region is in transition on the dead server, it must be
// opening or pending_open, which should have been covered by
// AM#cleanOutCrashedServerReferences
LOG.info("Skip assigning region " + hri.getRegionNameAsString()
+ " because it has been opened in " + addressFromAM.getServerName());
continue;
}
if (rit != null) {
if (rit.getServerName() != null && !rit.isOnServer(this.serverName)) {
// Skip regions that are in transition on other server
LOG.info("Skip assigning region in transition on other server" + rit);
continue;
}
LOG.info("Reassigning region " + rit + " and clearing zknode if exists");
regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
} else if (regionStates.isRegionInState(
hri, RegionState.State.SPLITTING_NEW, RegionState.State.MERGING_NEW)) {
regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
}
regionsToAssignAggregator.add(hri);
// TODO: The below else if is different in branch-1 from master branch.
} else if (rit != null) {
if ((rit.isClosing() || rit.isFailedClose() || rit.isOffline())
&& am.getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLED, TableState.State.DISABLING) ||
am.getReplicasToClose().contains(hri)) {
// If the table was partially disabled and the RS went down, we should clear the
// RIT and remove the node for the region.
// The rit that we use may be stale in case the table was in DISABLING state
// but though we did assign we will not be clearing the znode in CLOSING state.
// Doing this will have no harm. See HBASE-5927
regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
am.offlineDisabledRegion(hri);
} else {
LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition "
+ rit + " not to be assigned by SSH of server " + serverName);
}
}
} finally {
lock.unlock();
}
}
}
return regionsToAssignAggregator;
}
private boolean assign(final MasterProcedureEnv env, final List<HRegionInfo> hris)
throws InterruptedIOException {
AssignmentManager am = env.getMasterServices().getAssignmentManager();
try {
am.assign(hris);
} catch (InterruptedException ie) {
LOG.error("Caught " + ie + " during round-robin assignment");
throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
} catch (IOException ioe) {
LOG.info("Caught " + ioe + " during region assignment, will retry");
return false;
}
return true;
}
private boolean waitOnAssign(final MasterProcedureEnv env, final List<HRegionInfo> hris)
throws InterruptedIOException {
int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT);
for (HRegionInfo hri: hris) {
// TODO: Blocks here.
if (!waitOnRegionToClearRegionsInTransition(env.getMasterServices().getAssignmentManager(),
hri, timeout)) {
return false;
}
}
return true;
}
@Override
protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
throws IOException {
// Can't rollback.
throw new UnsupportedOperationException("unhandled state=" + state);
}
@Override
protected ServerCrashState getState(int stateId) {
return ServerCrashState.valueOf(stateId);
}
@Override
protected int getStateId(ServerCrashState state) {
return state.getNumber();
}
@Override
protected ServerCrashState getInitialState() {
return ServerCrashState.SERVER_CRASH_START;
}
@Override
protected boolean abort(MasterProcedureEnv env) {
// TODO
return false;
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.getMasterServices().isServerCrashProcessingEnabled()) return false;
return env.getProcedureQueue().tryAcquireServerExclusiveLock(this);
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseServerExclusiveLock(this);
}
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" serverName=");
sb.append(this.serverName);
sb.append(", shouldSplitWal=");
sb.append(shouldSplitWal);
sb.append(", carryingMeta=");
sb.append(carryingMeta);
}
@Override
public void serializeStateData(final OutputStream stream) throws IOException {
super.serializeStateData(stream);
MasterProcedureProtos.ServerCrashStateData.Builder state =
MasterProcedureProtos.ServerCrashStateData.newBuilder().
setServerName(ProtobufUtil.toServerName(this.serverName)).
setDistributedLogReplay(this.distributedLogReplay).
setCarryingMeta(this.carryingMeta).
setShouldSplitWal(this.shouldSplitWal);
if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) {
for (HRegionInfo hri: this.regionsOnCrashedServer) {
state.addRegionsOnCrashedServer(HRegionInfo.convert(hri));
}
}
if (this.regionsToAssign != null && !this.regionsToAssign.isEmpty()) {
for (HRegionInfo hri: this.regionsToAssign) {
state.addRegionsToAssign(HRegionInfo.convert(hri));
}
}
state.build().writeDelimitedTo(stream);
}
@Override
public void deserializeStateData(final InputStream stream) throws IOException {
super.deserializeStateData(stream);
MasterProcedureProtos.ServerCrashStateData state =
MasterProcedureProtos.ServerCrashStateData.parseDelimitedFrom(stream);
this.serverName = ProtobufUtil.toServerName(state.getServerName());
this.distributedLogReplay = state.hasDistributedLogReplay()?
state.getDistributedLogReplay(): false;
this.carryingMeta = state.hasCarryingMeta()? state.getCarryingMeta(): false;
// shouldSplitWAL has a default over in pb so this invocation will always work.
this.shouldSplitWal = state.getShouldSplitWal();
int size = state.getRegionsOnCrashedServerCount();
if (size > 0) {
this.regionsOnCrashedServer = new HashSet<HRegionInfo>(size);
for (RegionInfo ri: state.getRegionsOnCrashedServerList()) {
this.regionsOnCrashedServer.add(HRegionInfo.convert(ri));
}
}
size = state.getRegionsToAssignCount();
if (size > 0) {
this.regionsToAssign = new ArrayList<HRegionInfo>(size);
for (RegionInfo ri: state.getRegionsOnCrashedServerList()) {
this.regionsToAssign.add(HRegionInfo.convert(ri));
}
}
}
/**
* Process a dead region from a dead RS. Checks if the region is disabled or
* disabling or if the region has a partially completed split.
* @param hri
* @param assignmentManager
* @return Returns true if specified region should be assigned, false if not.
* @throws IOException
*/
private static boolean processDeadRegion(HRegionInfo hri, AssignmentManager assignmentManager)
throws IOException {
boolean tablePresent = assignmentManager.getTableStateManager().isTablePresent(hri.getTable());
if (!tablePresent) {
LOG.info("The table " + hri.getTable() + " was deleted. Hence not proceeding.");
return false;
}
// If table is not disabled but the region is offlined,
boolean disabled = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLED);
if (disabled){
LOG.info("The table " + hri.getTable() + " was disabled. Hence not proceeding.");
return false;
}
if (hri.isOffline() && hri.isSplit()) {
// HBASE-7721: Split parent and daughters are inserted into hbase:meta as an atomic operation.
// If the meta scanner saw the parent split, then it should see the daughters as assigned
// to the dead server. We don't have to do anything.
return false;
}
boolean disabling = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLING);
if (disabling) {
LOG.info("The table " + hri.getTable() + " is disabled. Hence not assigning region" +
hri.getEncodedName());
return false;
}
return true;
}
/**
* If hbase:meta is not assigned already, assign.
* @throws IOException
*/
private void verifyAndAssignMetaWithRetries(final MasterProcedureEnv env) throws IOException {
MasterServices services = env.getMasterServices();
int iTimes = services.getConfiguration().getInt(KEY_RETRIES_ON_META, DEFAULT_RETRIES_ON_META);
// Just reuse same time as we have for short wait on meta. Adding another config is overkill.
long waitTime =
services.getConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META);
int iFlag = 0;
while (true) {
try {
verifyAndAssignMeta(env);
break;
} catch (KeeperException e) {
services.abort("In server shutdown processing, assigning meta", e);
throw new IOException("Aborting", e);
} catch (Exception e) {
if (iFlag >= iTimes) {
services.abort("verifyAndAssignMeta failed after" + iTimes + " retries, aborting", e);
throw new IOException("Aborting", e);
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e1) {
LOG.warn("Interrupted when is the thread sleep", e1);
Thread.currentThread().interrupt();
throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
}
iFlag++;
}
}
}
/**
* If hbase:meta is not assigned already, assign.
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
*/
private void verifyAndAssignMeta(final MasterProcedureEnv env)
throws InterruptedException, IOException, KeeperException {
MasterServices services = env.getMasterServices();
if (!isMetaAssignedQuickTest(env)) {
services.getAssignmentManager().assignMeta(HRegionInfo.FIRST_META_REGIONINFO);
} else if (serverName.equals(services.getMetaTableLocator().
getMetaRegionLocation(services.getZooKeeper()))) {
throw new IOException("hbase:meta is onlined on the dead server " + this.serverName);
} else {
LOG.info("Skip assigning hbase:meta because it is online at "
+ services.getMetaTableLocator().getMetaRegionLocation(services.getZooKeeper()));
}
}
/**
* A quick test that hbase:meta is assigned; blocks for short time only.
* @return True if hbase:meta location is available and verified as good.
* @throws InterruptedException
* @throws IOException
*/
private boolean isMetaAssignedQuickTest(final MasterProcedureEnv env)
throws InterruptedException, IOException {
ZooKeeperWatcher zkw = env.getMasterServices().getZooKeeper();
MetaTableLocator mtl = env.getMasterServices().getMetaTableLocator();
boolean metaAssigned = false;
// Is hbase:meta location available yet?
if (mtl.isLocationAvailable(zkw)) {
ClusterConnection connection = env.getMasterServices().getConnection();
// Is hbase:meta location good yet?
long timeout =
env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META);
if (mtl.verifyMetaRegionLocation(connection, zkw, timeout)) {
metaAssigned = true;
}
}
return metaAssigned;
}
@Override
public ServerName getServerName() {
return this.serverName;
}
@Override
public boolean hasMetaTableRegion() {
return this.carryingMeta;
}
/**
* For this procedure, yield at end of each successful flow step so that all crashed servers
* can make progress rather than do the default which has each procedure running to completion
* before we move to the next. For crashed servers, especially if running with distributed log
* replay, we will want all servers to come along; we do not want the scenario where a server is
* stuck waiting for regions to online so it can replay edits.
*/
@Override
protected boolean isYieldAfterSuccessfulFlowStateStep() {
return true;
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Procedures that handle servers -- e.g. server crash -- must implement this Interface.
* It is used by the procedure runner to figure locking and what queuing.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ServerProcedureInterface {
/**
* @return Name of this server instance.
*/
ServerName getServerName();
/**
* @return True if this server has an hbase:meta table region.
*/
boolean hasMetaTableRegion();
}

View File

@ -45,4 +45,4 @@ public interface TableProcedureInterface {
* @return the operation type that the procedure is executing.
*/
TableOperationType getTableOperationType();
}
}

View File

@ -184,12 +184,12 @@ public class TruncateTableProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false;
return env.getProcedureQueue().tryAcquireTableWrite(getTableName(), "truncate table");
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table");
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableWrite(getTableName());
env.getProcedureQueue().releaseTableExclusiveLock(getTableName());
}
@Override
@ -288,4 +288,4 @@ public class TruncateTableProcedure
});
}
}
}
}

View File

@ -170,7 +170,7 @@ public class FSHDFSUtils extends FSUtils {
boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
final Configuration conf, final CancelableProgressable reporter)
throws IOException {
LOG.info("Recovering lease on dfs file " + p);
LOG.info("Recover lease on dfs file " + p);
long startWaiting = EnvironmentEdgeManager.currentTime();
// Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
// usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
@ -259,7 +259,7 @@ public class FSHDFSUtils extends FSUtils {
boolean recovered = false;
try {
recovered = dfs.recoverLease(p);
LOG.info("recoverLease=" + recovered + ", " +
LOG.info((recovered? "Recovered lease, ": "Failed to recover lease, ") +
getLogMessageDetail(nbAttempt, p, startWaiting));
} catch (IOException e) {
if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {

View File

@ -635,7 +635,7 @@ public class TestAssignmentManagerOnCluster {
am.getRegionStates().updateRegionState(hri, RegionState.State.PENDING_OPEN, destServerName);
am.getTableStateManager().setTableState(table, TableState.State.DISABLING);
List<HRegionInfo> toAssignRegions = am.processServerShutdown(destServerName);
List<HRegionInfo> toAssignRegions = am.cleanOutCrashedServerReferences(destServerName);
assertTrue("Regions to be assigned should be empty.", toAssignRegions.isEmpty());
assertTrue("Regions to be assigned should be empty.", am.getRegionStates()
.getRegionState(hri).isOffline());
@ -1222,8 +1222,8 @@ public class TestAssignmentManagerOnCluster {
}
@Override
public boolean isServerShutdownHandlerEnabled() {
return enabled.get() && super.isServerShutdownHandlerEnabled();
public boolean isServerCrashProcessingEnabled() {
return enabled.get() && super.isServerCrashProcessingEnabled();
}
public void enableSSH(boolean enabled) {

View File

@ -382,7 +382,7 @@ public class TestCatalogJanitor {
}
@Override
public boolean isServerShutdownHandlerEnabled() {
public boolean isServerCrashProcessingEnabled() {
return true;
}

View File

@ -1750,5 +1750,4 @@ public class TestDistributedLogSplitting {
return hrs;
}
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.junit.Test;
import org.junit.Ignore;
import org.junit.experimental.categories.Category;
@Category({FlakeyTests.class, LargeTests.class})
@ -218,7 +219,7 @@ public class TestMasterFailover {
HMaster master = masterThreads.get(0).getMaster();
assertTrue(master.isActiveMaster());
assertTrue(master.isInitialized());
// Create a table with a region online
Table onlineTable = TEST_UTIL.createTable(TableName.valueOf("onlineTable"), "family");
onlineTable.close();
@ -261,36 +262,36 @@ public class TestMasterFailover {
oldState = new RegionState(hriOffline, State.OFFLINE);
newState = new RegionState(hriOffline, State.PENDING_OPEN, newState.getServerName());
stateStore.updateRegionState(HConstants.NO_SEQNUM, newState, oldState);
HRegionInfo failedClose = new HRegionInfo(offlineTable.getTableName(), null, null);
createRegion(failedClose, rootdir, conf, offlineTable);
MetaTableAccessor.addRegionToMeta(master.getConnection(), failedClose);
oldState = new RegionState(failedClose, State.PENDING_CLOSE);
newState = new RegionState(failedClose, State.FAILED_CLOSE, newState.getServerName());
stateStore.updateRegionState(HConstants.NO_SEQNUM, newState, oldState);
HRegionInfo failedOpen = new HRegionInfo(offlineTable.getTableName(), null, null);
createRegion(failedOpen, rootdir, conf, offlineTable);
MetaTableAccessor.addRegionToMeta(master.getConnection(), failedOpen);
// Simulate a region transitioning to failed open when the region server reports the
// transition as FAILED_OPEN
oldState = new RegionState(failedOpen, State.PENDING_OPEN);
newState = new RegionState(failedOpen, State.FAILED_OPEN, newState.getServerName());
stateStore.updateRegionState(HConstants.NO_SEQNUM, newState, oldState);
HRegionInfo failedOpenNullServer = new HRegionInfo(offlineTable.getTableName(), null, null);
LOG.info("Failed open NUll server " + failedOpenNullServer.getEncodedName());
createRegion(failedOpenNullServer, rootdir, conf, offlineTable);
MetaTableAccessor.addRegionToMeta(master.getConnection(), failedOpenNullServer);
// Simulate a region transitioning to failed open when the master couldn't find a plan for
// the region
oldState = new RegionState(failedOpenNullServer, State.OFFLINE);
newState = new RegionState(failedOpenNullServer, State.FAILED_OPEN, null);
stateStore.updateRegionState(HConstants.NO_SEQNUM, newState, oldState);
// Stop the master
log("Aborting master");
cluster.abortMaster(0);

View File

@ -192,15 +192,26 @@ public class MasterProcedureTestingUtility {
assertTrue(tsm.getTableState(tableName).equals(TableState.State.DISABLED));
}
/**
* Run through all procedure flow states TWICE while also restarting procedure executor at each
* step; i.e force a reread of procedure store.
*
*<p>It does
* <ol><li>Execute step N - kill the executor before store update
* <li>Restart executor/store
* <li>Execute step N - and then save to store
* </ol>
*
*<p>This is a good test for finding state that needs persisting and steps that are not
* idempotent. Use this version of the test when a procedure executes all flow steps from start to
* finish.
* @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long)
*/
public static <TState> void testRecoveryAndDoubleExecution(
final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
final int numSteps, final TState[] states) throws Exception {
ProcedureTestingUtility.waitProcedure(procExec, procId);
assertEquals(false, procExec.isRunning());
// Restart the executor and execute the step twice
// execute step N - kill before store update
// restart executor/store
// execute step N - save on store
for (int i = 0; i < numSteps; ++i) {
LOG.info("Restart "+ i +" exec state: " + states[i]);
ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
@ -211,6 +222,35 @@ public class MasterProcedureTestingUtility {
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
}
/**
* Run through all procedure flow states TWICE while also restarting procedure executor at each
* step; i.e force a reread of procedure store.
*
*<p>It does
* <ol><li>Execute step N - kill the executor before store update
* <li>Restart executor/store
* <li>Execute step N - and then save to store
* </ol>
*
*<p>This is a good test for finding state that needs persisting and steps that are not
* idempotent. Use this version of the test when the order in which flow steps are executed is
* not start to finish; where the procedure may vary the flow steps dependent on circumstance
* found.
* @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, Object[])
*/
public static <TState> void testRecoveryAndDoubleExecution(
final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId)
throws Exception {
ProcedureTestingUtility.waitProcedure(procExec, procId);
assertEquals(false, procExec.isRunning());
while (!procExec.isFinished(procId)) {
ProcedureTestingUtility.restart(procExec);
ProcedureTestingUtility.waitProcedure(procExec, procId);
}
assertEquals(true, procExec.isRunning());
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
}
public static <TState> void testRollbackAndDoubleExecution(
final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
final int lastStep, final TState[] states) throws Exception {

View File

@ -18,14 +18,15 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -38,7 +39,6 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -119,12 +119,12 @@ public class TestMasterProcedureQueue {
// fetch item and take a lock
assertEquals(1, queue.poll().longValue());
// take the xlock
assertTrue(queue.tryAcquireTableWrite(tableName, "write"));
assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write"));
// table can't be deleted because we have the lock
assertEquals(0, queue.size());
assertFalse(queue.markTableAsDeleted(tableName));
// release the xlock
queue.releaseTableWrite(tableName);
queue.releaseTableExclusiveLock(tableName);
// complete the table deletion
assertTrue(queue.markTableAsDeleted(tableName));
}
@ -150,7 +150,7 @@ public class TestMasterProcedureQueue {
// fetch item and take a lock
assertEquals(i, queue.poll().longValue());
// take the rlock
assertTrue(queue.tryAcquireTableRead(tableName, "read " + i));
assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i));
// table can't be deleted because we have locks and/or items in the queue
assertFalse(queue.markTableAsDeleted(tableName));
}
@ -159,7 +159,7 @@ public class TestMasterProcedureQueue {
// table can't be deleted because we have locks
assertFalse(queue.markTableAsDeleted(tableName));
// release the rlock
queue.releaseTableRead(tableName);
queue.releaseTableSharedLock(tableName);
}
// there are no items and no lock in the queeu
@ -188,47 +188,47 @@ public class TestMasterProcedureQueue {
// Fetch the 1st item and take the write lock
Long procId = queue.poll();
assertEquals(1, procId.longValue());
assertEquals(true, queue.tryAcquireTableWrite(tableName, "write " + procId));
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
// Fetch the 2nd item and verify that the lock can't be acquired
assertEquals(null, queue.poll());
// Release the write lock and acquire the read lock
queue.releaseTableWrite(tableName);
queue.releaseTableExclusiveLock(tableName);
// Fetch the 2nd item and take the read lock
procId = queue.poll();
assertEquals(2, procId.longValue());
assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
// Fetch the 3rd item and verify that the lock can't be acquired
procId = queue.poll();
assertEquals(3, procId.longValue());
assertEquals(false, queue.tryAcquireTableWrite(tableName, "write " + procId));
assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
// release the rdlock of item 2 and take the wrlock for the 3d item
queue.releaseTableRead(tableName);
assertEquals(true, queue.tryAcquireTableWrite(tableName, "write " + procId));
queue.releaseTableSharedLock(tableName);
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
// Fetch 4th item and verify that the lock can't be acquired
assertEquals(null, queue.poll());
// Release the write lock and acquire the read lock
queue.releaseTableWrite(tableName);
queue.releaseTableExclusiveLock(tableName);
// Fetch the 4th item and take the read lock
procId = queue.poll();
assertEquals(4, procId.longValue());
assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
// Fetch the 4th item and take the read lock
procId = queue.poll();
assertEquals(5, procId.longValue());
assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
// Release 4th and 5th read-lock
queue.releaseTableRead(tableName);
queue.releaseTableRead(tableName);
queue.releaseTableSharedLock(tableName);
queue.releaseTableSharedLock(tableName);
// remove table queue
assertEquals(0, queue.size());
@ -354,11 +354,11 @@ public class TestMasterProcedureQueue {
case CREATE:
case DELETE:
case EDIT:
avail = queue.tryAcquireTableWrite(proc.getTableName(),
avail = queue.tryAcquireTableExclusiveLock(proc.getTableName(),
"op="+ proc.getTableOperationType());
break;
case READ:
avail = queue.tryAcquireTableRead(proc.getTableName(),
avail = queue.tryAcquireTableSharedLock(proc.getTableName(),
"op="+ proc.getTableOperationType());
break;
}
@ -375,10 +375,10 @@ public class TestMasterProcedureQueue {
case CREATE:
case DELETE:
case EDIT:
queue.releaseTableWrite(proc.getTableName());
queue.releaseTableExclusiveLock(proc.getTableName());
break;
case READ:
queue.releaseTableRead(proc.getTableName());
queue.releaseTableSharedLock(proc.getTableName());
break;
}
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Runs first with DLS and then with DLR.
*/
@Category(LargeTests.class)
@RunWith(Parameterized.class)
public class TestServerCrashProcedure {
// Ugly junit parameterization. I just want to pass false and then true but seems like needs
// to return sequences of two-element arrays.
@Parameters(name = "{index}: setting={0}")
public static Collection<Object []> data() {
return Arrays.asList(new Object[] [] {{Boolean.FALSE, -1}, {Boolean.TRUE, -1}});
}
private final HBaseTestingUtility util = new HBaseTestingUtility();
@Before
public void setup() throws Exception {
this.util.startMiniCluster(3);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(
this.util.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false);
}
@After
public void tearDown() throws Exception {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(
this.util.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false);
this.util.shutdownMiniCluster();
}
public TestServerCrashProcedure(final Boolean b, final int ignore) {
this.util.getConfiguration().setBoolean("hbase.master.distributed.log.replay", b);
this.util.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
}
/**
* Run server crash procedure steps twice to test idempotency and that we are persisting all
* needed state.
* @throws Exception
*/
@Test(timeout = 300000)
public void testRecoveryAndDoubleExecutionOnline() throws Exception {
final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecutionOnline");
this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
try (Table t = this.util.getConnection().getTable(tableName)) {
// Load the table with a bit of data so some logs to split and some edits in each region.
this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
int count = countRows(t);
// Run the procedure executor outside the master so we can mess with it. Need to disable
// Master's running of the server crash processing.
HMaster master = this.util.getHBaseCluster().getMaster();
final ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
master.setServerCrashProcessingEnabled(false);
// Kill a server. Master will notice but do nothing other than add it to list of dead servers.
HRegionServer hrs = this.util.getHBaseCluster().getRegionServer(0);
boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(hrs.getServerName());
this.util.getHBaseCluster().killRegionServer(hrs.getServerName());
hrs.join();
// Wait until the expiration of the server has arrived at the master. We won't process it
// by queuing a ServerCrashProcedure because we have disabled crash processing... but wait
// here so ServerManager gets notice and adds expired server to appropriate queues.
while (!master.getServerManager().isServerDead(hrs.getServerName())) Threads.sleep(10);
// Now, reenable processing else we can't get a lock on the ServerCrashProcedure.
master.setServerCrashProcessingEnabled(true);
// Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
master.getServerManager().moveFromOnelineToDeadServers(hrs.getServerName());
// Enable test flags and then queue the crash procedure.
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
long procId =
procExec.submitProcedure(new ServerCrashProcedure(hrs.getServerName(), true, carryingMeta));
// Now run through the procedure twice crashing the executor on each step...
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
// Assert all data came back.
assertEquals(count, countRows(t));
}
}
int countRows(final Table t) throws IOException {
int count = 0;
try (ResultScanner scanner = t.getScanner(new Scan())) {
while(scanner.next() != null) count++;
}
return count;
}
}

View File

@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.snapshot;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
@ -31,21 +33,14 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.TestTableName;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({ MediumTests.class })
public class TestSnapshotClientRetries {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();