HBASE-20700 Move meta region when server crash can cause the procedure to be stuck
This commit is contained in:
parent
cc7aefe0bb
commit
573b57d437
|
@ -22,5 +22,5 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
|
||||
@InterfaceAudience.Private
|
||||
public enum LockedResourceType {
|
||||
SERVER, NAMESPACE, TABLE, REGION, PEER
|
||||
SERVER, NAMESPACE, TABLE, REGION, PEER, META
|
||||
}
|
||||
|
|
|
@ -910,7 +910,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
// Bring up hbase:meta. recoverMeta is a blocking call waiting until hbase:meta is deployed.
|
||||
// It also starts the TableStateManager.
|
||||
MasterMetaBootstrap metaBootstrap = createMetaBootstrap(this, status);
|
||||
MasterMetaBootstrap metaBootstrap = createMetaBootstrap();
|
||||
metaBootstrap.recoverMeta();
|
||||
|
||||
//Initialize after meta as it scans meta
|
||||
|
@ -1055,12 +1055,18 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Create a {@link MasterMetaBootstrap} instance.
|
||||
* </p>
|
||||
* <p>
|
||||
* Will be overridden in tests.
|
||||
* </p>
|
||||
*/
|
||||
MasterMetaBootstrap createMetaBootstrap(final HMaster master, final MonitoredTask status) {
|
||||
@VisibleForTesting
|
||||
protected MasterMetaBootstrap createMetaBootstrap() {
|
||||
// We put this out here in a method so can do a Mockito.spy and stub it out
|
||||
// w/ a mocked up MasterMetaBootstrap.
|
||||
return new MasterMetaBootstrap(master, status);
|
||||
return new MasterMetaBootstrap(this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3161,7 +3167,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
cpHost.preGetLocks();
|
||||
}
|
||||
|
||||
MasterProcedureScheduler procedureScheduler = procedureExecutor.getEnvironment().getProcedureScheduler();
|
||||
MasterProcedureScheduler procedureScheduler =
|
||||
procedureExecutor.getEnvironment().getProcedureScheduler();
|
||||
|
||||
final List<LockedResource> lockedResources = procedureScheduler.getLocks();
|
||||
|
||||
|
@ -3606,10 +3613,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
@Override
|
||||
public boolean recoverMeta() throws IOException {
|
||||
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
|
||||
// we need to block here so the latch should be greater than the current version to make sure
|
||||
// that we will block.
|
||||
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(Integer.MAX_VALUE, 0);
|
||||
procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch));
|
||||
latch.await();
|
||||
LOG.info("hbase:meta deployed at=" +
|
||||
LOG.info("hbase:meta deployed at={}",
|
||||
getMetaTableLocator().getMetaRegionLocation(getZooKeeper()));
|
||||
return assignmentManager.isMetaInitialized();
|
||||
}
|
||||
|
|
|
@ -21,14 +21,12 @@ package org.apache.hadoop.hbase.master;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
@ -44,12 +42,10 @@ import org.slf4j.LoggerFactory;
|
|||
public class MasterMetaBootstrap {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MasterMetaBootstrap.class);
|
||||
|
||||
private final MonitoredTask status;
|
||||
private final HMaster master;
|
||||
|
||||
public MasterMetaBootstrap(final HMaster master, final MonitoredTask status) {
|
||||
public MasterMetaBootstrap(HMaster master) {
|
||||
this.master = master;
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public void recoverMeta() throws InterruptedException, IOException {
|
||||
|
@ -58,7 +54,7 @@ public class MasterMetaBootstrap {
|
|||
// Now we can start the TableStateManager. It is backed by hbase:meta.
|
||||
master.getTableStateManager().start();
|
||||
// Enable server crash procedure handling
|
||||
enableCrashedServerProcessing(false);
|
||||
enableCrashedServerProcessing();
|
||||
}
|
||||
|
||||
public void processDeadServers() {
|
||||
|
@ -142,8 +138,7 @@ public class MasterMetaBootstrap {
|
|||
}
|
||||
}
|
||||
|
||||
private void enableCrashedServerProcessing(final boolean waitForMeta)
|
||||
throws InterruptedException {
|
||||
private void enableCrashedServerProcessing() throws InterruptedException {
|
||||
// If crashed server processing is disabled, we enable it and expire those dead but not expired
|
||||
// servers. This is required so that if meta is assigning to a server which dies after
|
||||
// assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
|
||||
|
@ -152,9 +147,5 @@ public class MasterMetaBootstrap {
|
|||
master.setServerCrashProcessingEnabled(true);
|
||||
master.getServerManager().processQueuedDeadServers();
|
||||
}
|
||||
|
||||
if (waitForMeta) {
|
||||
master.getMetaTableLocator().waitMetaRegionLocation(master.getZooKeeper());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -79,16 +78,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
|
||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||
import org.apache.hadoop.hbase.regionserver.SequenceId;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||
|
@ -96,6 +88,15 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||
|
||||
/**
|
||||
* The AssignmentManager is the coordinator for region assign/unassign operations.
|
||||
* <ul>
|
||||
|
@ -966,7 +967,7 @@ public class AssignmentManager implements ServerListener {
|
|||
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
||||
|
||||
synchronized (serverNode) {
|
||||
if (serverNode.isInState(ServerState.SPLITTING, ServerState.OFFLINE)) {
|
||||
if (!serverNode.isInState(ServerState.ONLINE)) {
|
||||
LOG.warn("Got a report from a server result in state " + serverNode.getState());
|
||||
return;
|
||||
}
|
||||
|
@ -1918,22 +1919,38 @@ public class AssignmentManager implements ServerListener {
|
|||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* This is a very particular check. The {@link org.apache.hadoop.hbase.master.ServerManager} is
|
||||
* where you go to check on state of 'Servers', what Servers are online, etc. Here we are
|
||||
* checking the state of a server that is post expiration, a ServerManager function that moves a
|
||||
* server from online to dead. Here we are seeing if the server has moved beyond a particular
|
||||
* point in the recovery process such that it is safe to move on with assigns; etc.
|
||||
* @return True if this Server does not exist or if does and it is marked as OFFLINE (which
|
||||
* happens after all WALs have been split on this server making it so assigns, etc. can
|
||||
* proceed). If null, presumes the ServerStateNode was cleaned up by SCP.
|
||||
* where you go to check on state of 'Servers', what Servers are online, etc.
|
||||
* </p>
|
||||
* <p>
|
||||
* Here we are checking the state of a server that is post expiration, a ServerManager function
|
||||
* that moves a server from online to dead. Here we are seeing if the server has moved beyond a
|
||||
* particular point in the recovery process such that it is safe to move on with assigns; etc.
|
||||
* </p>
|
||||
* <p>
|
||||
* For now it is only used in
|
||||
* {@link UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)} to
|
||||
* see whether we can safely quit without losing data.
|
||||
* </p>
|
||||
* @param meta whether to check for meta log splitting
|
||||
* @return {@code true} if the server does not exist or the log splitting is done, i.e, the server
|
||||
* is in OFFLINE state, or for meta log, is in SPLITTING_META_DONE state. If null,
|
||||
* presumes the ServerStateNode was cleaned up by SCP.
|
||||
* @see UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)
|
||||
*/
|
||||
boolean isDeadServerProcessed(final ServerName serverName) {
|
||||
boolean isLogSplittingDone(ServerName serverName, boolean meta) {
|
||||
ServerStateNode ssn = this.regionStates.getServerNode(serverName);
|
||||
if (ssn == null) {
|
||||
return true;
|
||||
}
|
||||
ServerState[] inState =
|
||||
meta
|
||||
? new ServerState[] { ServerState.SPLITTING_META_DONE, ServerState.SPLITTING,
|
||||
ServerState.OFFLINE }
|
||||
: new ServerState[] { ServerState.OFFLINE };
|
||||
synchronized (ssn) {
|
||||
return ssn.isOffline();
|
||||
return ssn.isInState(inState);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -317,13 +317,27 @@ public class RegionStates {
|
|||
*/
|
||||
ONLINE,
|
||||
|
||||
/**
|
||||
* Only server which carries meta can have this state. We will split wal for meta and then
|
||||
* assign meta first before splitting other wals.
|
||||
*/
|
||||
SPLITTING_META,
|
||||
|
||||
/**
|
||||
* Indicate that the meta splitting is done. We need this state so that the UnassignProcedure
|
||||
* for meta can safely quit. See the comments in UnassignProcedure.remoteCallFailed for more
|
||||
* details.
|
||||
*/
|
||||
SPLITTING_META_DONE,
|
||||
|
||||
/**
|
||||
* Server expired/crashed. Currently undergoing WAL splitting.
|
||||
*/
|
||||
SPLITTING,
|
||||
|
||||
/**
|
||||
* WAL splitting done.
|
||||
* WAL splitting done. This state will be used to tell the UnassignProcedure that it can safely
|
||||
* quit. See the comments in UnassignProcedure.remoteCallFailed for more details.
|
||||
*/
|
||||
OFFLINE
|
||||
}
|
||||
|
@ -357,10 +371,6 @@ public class RegionStates {
|
|||
return reportEvent;
|
||||
}
|
||||
|
||||
public boolean isOffline() {
|
||||
return this.state.equals(ServerState.OFFLINE);
|
||||
}
|
||||
|
||||
public boolean isInState(final ServerState... expected) {
|
||||
boolean expectedState = false;
|
||||
if (expected != null) {
|
||||
|
@ -371,7 +381,7 @@ public class RegionStates {
|
|||
return expectedState;
|
||||
}
|
||||
|
||||
public void setState(final ServerState state) {
|
||||
private void setState(final ServerState state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
|
@ -612,18 +622,40 @@ public class RegionStates {
|
|||
}
|
||||
|
||||
// ============================================================================================
|
||||
// TODO: split helpers
|
||||
// Split helpers
|
||||
// These methods will only be called in ServerCrashProcedure, and at the end of SCP we will remove
|
||||
// the ServerStateNode by calling removeServer.
|
||||
// ============================================================================================
|
||||
|
||||
private void setServerState(ServerName serverName, ServerState state) {
|
||||
ServerStateNode serverNode = getOrCreateServer(serverName);
|
||||
synchronized (serverNode) {
|
||||
serverNode.setState(state);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this when we start log splitting a crashed Server.
|
||||
* Call this when we start meta log splitting a crashed Server.
|
||||
* @see #metaLogSplit(ServerName)
|
||||
*/
|
||||
public void metaLogSplitting(ServerName serverName) {
|
||||
setServerState(serverName, ServerState.SPLITTING_META);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after we've split the meta logs on a crashed Server.
|
||||
* @see #metaLogSplitting(ServerName)
|
||||
*/
|
||||
public void metaLogSplit(ServerName serverName) {
|
||||
setServerState(serverName, ServerState.SPLITTING_META_DONE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this when we start log splitting for a crashed Server.
|
||||
* @see #logSplit(ServerName)
|
||||
*/
|
||||
public void logSplitting(final ServerName serverName) {
|
||||
final ServerStateNode serverNode = getOrCreateServer(serverName);
|
||||
synchronized (serverNode) {
|
||||
serverNode.setState(ServerState.SPLITTING);
|
||||
}
|
||||
setServerState(serverName, ServerState.SPLITTING);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -631,17 +663,7 @@ public class RegionStates {
|
|||
* @see #logSplitting(ServerName)
|
||||
*/
|
||||
public void logSplit(final ServerName serverName) {
|
||||
final ServerStateNode serverNode = getOrCreateServer(serverName);
|
||||
synchronized (serverNode) {
|
||||
serverNode.setState(ServerState.OFFLINE);
|
||||
}
|
||||
}
|
||||
|
||||
public void logSplit(final RegionInfo regionInfo) {
|
||||
final RegionStateNode regionNode = getRegionStateNode(regionInfo);
|
||||
synchronized (regionNode) {
|
||||
regionNode.setState(State.SPLIT);
|
||||
}
|
||||
setServerState(serverName, ServerState.OFFLINE);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -213,7 +213,7 @@ public abstract class RegionTransitionProcedure
|
|||
RegionStateNode regionNode, IOException exception);
|
||||
|
||||
@Override
|
||||
public void remoteCallFailed(final MasterProcedureEnv env,
|
||||
public synchronized void remoteCallFailed(final MasterProcedureEnv env,
|
||||
final ServerName serverName, final IOException exception) {
|
||||
final RegionStateNode regionNode = getRegionState(env);
|
||||
LOG.warn("Remote call failed {}; {}; {}; exception={}", serverName,
|
||||
|
|
|
@ -310,12 +310,29 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
exception.getClass().getSimpleName());
|
||||
if (!env.getMasterServices().getServerManager().expireServer(serverName)) {
|
||||
// Failed to queue an expire. Lots of possible reasons including it may be already expired.
|
||||
// If so, is it beyond the state where we will be woken-up if go ahead and suspend the
|
||||
// procedure. Look for this rare condition.
|
||||
if (env.getAssignmentManager().isDeadServerProcessed(serverName)) {
|
||||
// In ServerCrashProcedure and RecoverMetaProcedure, there is a handleRIT stage where we
|
||||
// will iterator over all the RIT procedures for the related regions of a crashed RS and
|
||||
// fail them with ServerCrashException. You can see the isSafeToProceed method above for
|
||||
// more details.
|
||||
// This can work for most cases, but since we do not hold the region lock in handleRIT,
|
||||
// there could be race that we arrive here after the handleRIT stage of the SCP. So here we
|
||||
// need to check whether it is safe to quit.
|
||||
// Notice that, the first assumption is that we can only quit after the log splitting is
|
||||
// done, as MRP can schedule an AssignProcedure right after us, and if the log splitting has
|
||||
// not been done then there will be data loss. And in SCP, we will change the state from
|
||||
// SPLITTING to OFFLINE(or SPLITTING_META_DONE for meta log processing) after finishing the
|
||||
// log splitting, and then calling handleRIT, so checking the state here can be a safe
|
||||
// fence. If the state is not OFFLINE(or SPLITTING_META_DONE), then we can just leave this
|
||||
// procedure in suspended state as we can make sure that the handleRIT has not been executed
|
||||
// yet and it will wake us up later. And if the state is OFFLINE(or SPLITTING_META_DONE), we
|
||||
// can safely quit since there will be no data loss. There could be duplicated
|
||||
// AssignProcedures for the same region but it is OK as we will do a check at the beginning
|
||||
// of AssignProcedure to prevent double assign. And there we have region lock so there will
|
||||
// be no race.
|
||||
if (env.getAssignmentManager().isLogSplittingDone(serverName, isMeta())) {
|
||||
// Its ok to proceed with this unassign.
|
||||
LOG.info("{} is dead and processed; moving procedure to finished state; {}",
|
||||
serverName, this);
|
||||
LOG.info("{} is dead and processed; moving procedure to finished state; {}", serverName,
|
||||
this);
|
||||
proceed(env, regionNode);
|
||||
// Return true; wake up the procedure so we can act on proceed.
|
||||
return true;
|
||||
|
|
|
@ -101,14 +101,18 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
(n, k) -> n.compareKey((TableName) k);
|
||||
private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
|
||||
(n, k) -> n.compareKey((String) k);
|
||||
private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
|
||||
(n, k) -> n.compareKey((TableName) k);
|
||||
|
||||
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
|
||||
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
|
||||
private final FairQueue<String> peerRunQueue = new FairQueue<>();
|
||||
private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
|
||||
|
||||
private final ServerQueue[] serverBuckets = new ServerQueue[128];
|
||||
private TableQueue tableMap = null;
|
||||
private PeerQueue peerMap = null;
|
||||
private MetaQueue metaMap = null;
|
||||
|
||||
private final SchemaLocking locking = new SchemaLocking();
|
||||
|
||||
|
@ -119,7 +123,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
|
||||
@Override
|
||||
protected void enqueue(final Procedure proc, final boolean addFront) {
|
||||
if (isTableProcedure(proc)) {
|
||||
if (isMetaProcedure(proc)) {
|
||||
doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
|
||||
} else if (isTableProcedure(proc)) {
|
||||
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
|
||||
} else if (isServerProcedure(proc)) {
|
||||
doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
|
||||
|
@ -153,16 +159,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
|
||||
@Override
|
||||
protected boolean queueHasRunnables() {
|
||||
return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() ||
|
||||
peerRunQueue.hasRunnables();
|
||||
return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() ||
|
||||
serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure dequeue() {
|
||||
// meta procedure is always the first priority
|
||||
Procedure<?> pollResult = doPoll(metaRunQueue);
|
||||
// For now, let server handling have precedence over table handling; presumption is that it
|
||||
// is more important handling crashed servers than it is running the
|
||||
// enabling/disabling tables, etc.
|
||||
Procedure<?> pollResult = doPoll(serverRunQueue);
|
||||
if (pollResult == null) {
|
||||
pollResult = doPoll(serverRunQueue);
|
||||
}
|
||||
if (pollResult == null) {
|
||||
pollResult = doPoll(peerRunQueue);
|
||||
}
|
||||
|
@ -263,31 +273,24 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
private int queueSize(Queue<?> head) {
|
||||
int count = 0;
|
||||
AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
|
||||
while (iter.hasNext()) {
|
||||
count += iter.next().size();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int queueSize() {
|
||||
int count = 0;
|
||||
|
||||
// Server queues
|
||||
final AvlTreeIterator<ServerQueue> serverIter = new AvlTreeIterator<>();
|
||||
for (int i = 0; i < serverBuckets.length; ++i) {
|
||||
serverIter.seekFirst(serverBuckets[i]);
|
||||
while (serverIter.hasNext()) {
|
||||
count += serverIter.next().size();
|
||||
for (ServerQueue serverMap : serverBuckets) {
|
||||
count += queueSize(serverMap);
|
||||
}
|
||||
}
|
||||
|
||||
// Table queues
|
||||
final AvlTreeIterator<TableQueue> tableIter = new AvlTreeIterator<>(tableMap);
|
||||
while (tableIter.hasNext()) {
|
||||
count += tableIter.next().size();
|
||||
}
|
||||
|
||||
// Peer queues
|
||||
final AvlTreeIterator<PeerQueue> peerIter = new AvlTreeIterator<>(peerMap);
|
||||
while (peerIter.hasNext()) {
|
||||
count += peerIter.next().size();
|
||||
}
|
||||
|
||||
count += queueSize(tableMap);
|
||||
count += queueSize(peerMap);
|
||||
count += queueSize(metaMap);
|
||||
return count;
|
||||
}
|
||||
|
||||
|
@ -430,6 +433,22 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
return ((PeerProcedureInterface) proc).getPeerId();
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Meta Queue Lookup Helpers
|
||||
// ============================================================================
|
||||
private MetaQueue getMetaQueue() {
|
||||
MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
|
||||
if (node != null) {
|
||||
return node;
|
||||
}
|
||||
node = new MetaQueue(locking.getMetaLock());
|
||||
metaMap = AvlTree.insert(metaMap, node);
|
||||
return node;
|
||||
}
|
||||
|
||||
private static boolean isMetaProcedure(Procedure<?> proc) {
|
||||
return proc instanceof MetaProcedureInterface;
|
||||
}
|
||||
// ============================================================================
|
||||
// Table Locking Helpers
|
||||
// ============================================================================
|
||||
|
@ -866,6 +885,49 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Meta Locking Helpers
|
||||
// ============================================================================
|
||||
/**
|
||||
* Try to acquire the exclusive lock on meta.
|
||||
* @see #wakeMetaExclusiveLock(Procedure)
|
||||
* @param procedure the procedure trying to acquire the lock
|
||||
* @return true if the procedure has to wait for meta to be available
|
||||
*/
|
||||
public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
|
||||
schedLock();
|
||||
try {
|
||||
final LockAndQueue lock = locking.getMetaLock();
|
||||
if (lock.tryExclusiveLock(procedure)) {
|
||||
removeFromRunQueue(metaRunQueue, getMetaQueue());
|
||||
return false;
|
||||
}
|
||||
waitProcedure(lock, procedure);
|
||||
logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
|
||||
return true;
|
||||
} finally {
|
||||
schedUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wake the procedures waiting for meta.
|
||||
* @see #waitMetaExclusiveLock(Procedure)
|
||||
* @param procedure the procedure releasing the lock
|
||||
*/
|
||||
public void wakeMetaExclusiveLock(Procedure<?> procedure) {
|
||||
schedLock();
|
||||
try {
|
||||
final LockAndQueue lock = locking.getMetaLock();
|
||||
lock.releaseExclusiveLock(procedure);
|
||||
addToRunQueue(metaRunQueue, getMetaQueue());
|
||||
int waitingCount = wakeWaitingProcedures(lock);
|
||||
wakePollIfNeeded(waitingCount);
|
||||
} finally {
|
||||
schedUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For debugging. Expensive.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public interface MetaProcedureInterface {
|
||||
|
||||
enum MetaOperationType {
|
||||
RECOVER
|
||||
}
|
||||
|
||||
default MetaOperationType getMetaOperationType() {
|
||||
return MetaOperationType.RECOVER;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.procedure2.LockStatus;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class MetaQueue extends Queue<TableName> {
|
||||
|
||||
protected MetaQueue(LockStatus lockStatus) {
|
||||
super(TableName.META_TABLE_NAME, 1, lockStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean requireExclusiveLock(Procedure<?> proc) {
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -18,10 +18,8 @@
|
|||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface PeerProcedureInterface {
|
||||
|
||||
enum PeerOperationType {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -15,13 +15,11 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
|
@ -54,7 +52,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
|
|||
@InterfaceAudience.Private
|
||||
public class RecoverMetaProcedure
|
||||
extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.RecoverMetaState>
|
||||
implements TableProcedureInterface {
|
||||
implements MetaProcedureInterface {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RecoverMetaProcedure.class);
|
||||
|
||||
private ServerName failedMetaServer;
|
||||
|
@ -125,21 +123,25 @@ public class RecoverMetaProcedure
|
|||
LOG.info("Start " + this);
|
||||
if (shouldSplitWal) {
|
||||
// TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
|
||||
AssignmentManager am = env.getMasterServices().getAssignmentManager();
|
||||
if (failedMetaServer != null) {
|
||||
am.getRegionStates().metaLogSplitting(failedMetaServer);
|
||||
master.getMasterWalManager().splitMetaLog(failedMetaServer);
|
||||
am.getRegionStates().metaLogSplit(failedMetaServer);
|
||||
} else {
|
||||
ServerName serverName =
|
||||
master.getMetaTableLocator().getMetaRegionLocation(master.getZooKeeper());
|
||||
Set<ServerName> previouslyFailedServers =
|
||||
master.getMasterWalManager().getFailedServersFromLogFolders();
|
||||
if (serverName != null && previouslyFailedServers.contains(serverName)) {
|
||||
am.getRegionStates().metaLogSplitting(serverName);
|
||||
master.getMasterWalManager().splitMetaLog(serverName);
|
||||
am.getRegionStates().metaLogSplit(serverName);
|
||||
}
|
||||
}
|
||||
}
|
||||
setNextState(RecoverMetaState.RECOVER_META_ASSIGN_REGIONS);
|
||||
break;
|
||||
|
||||
case RECOVER_META_ASSIGN_REGIONS:
|
||||
RegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(
|
||||
RegionInfoBuilder.FIRST_META_REGIONINFO, this.replicaId);
|
||||
|
@ -258,7 +260,7 @@ public class RecoverMetaProcedure
|
|||
|
||||
@Override
|
||||
protected LockState acquireLock(MasterProcedureEnv env) {
|
||||
if (env.getProcedureScheduler().waitTableExclusiveLock(this, TableName.META_TABLE_NAME)) {
|
||||
if (env.getProcedureScheduler().waitMetaExclusiveLock(this)) {
|
||||
return LockState.LOCK_EVENT_WAIT;
|
||||
}
|
||||
return LockState.LOCK_ACQUIRED;
|
||||
|
@ -266,7 +268,7 @@ public class RecoverMetaProcedure
|
|||
|
||||
@Override
|
||||
protected void releaseLock(MasterProcedureEnv env) {
|
||||
env.getProcedureScheduler().wakeTableExclusiveLock(this, TableName.META_TABLE_NAME);
|
||||
env.getProcedureScheduler().wakeMetaExclusiveLock(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -274,16 +276,6 @@ public class RecoverMetaProcedure
|
|||
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableName() {
|
||||
return TableName.META_TABLE_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return TableOperationType.ENABLE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if failedMetaServer is not null (meta carrying server crashed) or meta is
|
||||
* already initialized
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.procedure2.LockedResourceType;
|
|||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Locks on namespaces, tables, and regions.
|
||||
|
@ -49,6 +51,7 @@ class SchemaLocking {
|
|||
// Single map for all regions irrespective of tables. Key is encoded region name.
|
||||
private final Map<String, LockAndQueue> regionLocks = new HashMap<>();
|
||||
private final Map<String, LockAndQueue> peerLocks = new HashMap<>();
|
||||
private final LockAndQueue metaLock = new LockAndQueue();
|
||||
|
||||
private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) {
|
||||
LockAndQueue lock = map.get(key);
|
||||
|
@ -75,6 +78,10 @@ class SchemaLocking {
|
|||
return getLock(regionLocks, encodedRegionName);
|
||||
}
|
||||
|
||||
LockAndQueue getMetaLock() {
|
||||
return metaLock;
|
||||
}
|
||||
|
||||
LockAndQueue removeRegionLock(String encodedRegionName) {
|
||||
return regionLocks.remove(encodedRegionName);
|
||||
}
|
||||
|
@ -144,6 +151,8 @@ class SchemaLocking {
|
|||
addToLockedResources(lockedResources, regionLocks, Function.identity(),
|
||||
LockedResourceType.REGION);
|
||||
addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER);
|
||||
addToLockedResources(lockedResources, ImmutableMap.of(TableName.META_TABLE_NAME, metaLock),
|
||||
tn -> tn.getNameAsString(), LockedResourceType.META);
|
||||
return lockedResources;
|
||||
}
|
||||
|
||||
|
@ -169,6 +178,8 @@ class SchemaLocking {
|
|||
case PEER:
|
||||
queue = peerLocks.get(resourceName);
|
||||
break;
|
||||
case META:
|
||||
queue = metaLock;
|
||||
default:
|
||||
queue = null;
|
||||
break;
|
||||
|
@ -193,7 +204,8 @@ class SchemaLocking {
|
|||
return "serverLocks=" + filterUnlocked(this.serverLocks) + ", namespaceLocks=" +
|
||||
filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks) +
|
||||
", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks=" +
|
||||
filterUnlocked(this.peerLocks);
|
||||
filterUnlocked(this.peerLocks) + ", metaLocks=" +
|
||||
filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock));
|
||||
}
|
||||
|
||||
private String filterUnlocked(Map<?, LockAndQueue> locks) {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -163,11 +163,11 @@ implements ServerProcedureInterface {
|
|||
"; cycles=" + getCycles());
|
||||
}
|
||||
// Handle RIT against crashed server. Will cancel any ongoing assigns/unassigns.
|
||||
// Returns list of regions we need to reassign. NOTE: there is nothing to stop a
|
||||
// dispatch happening AFTER this point. Check for the condition if a dispatch RPC fails
|
||||
// inside in AssignProcedure/UnassignProcedure. AssignProcedure just keeps retrying.
|
||||
// UnassignProcedure is more complicated. See where it does the check by calling
|
||||
// am#isDeadServerProcessed.
|
||||
// Returns list of regions we need to reassign.
|
||||
// NOTE: there is nothing to stop a dispatch happening AFTER this point. Check for the
|
||||
// condition if a dispatch RPC fails inside in AssignProcedure/UnassignProcedure.
|
||||
// AssignProcedure just keeps retrying. UnassignProcedure is more complicated. See where
|
||||
// it does the check by calling am#isLogSplittingDone.
|
||||
List<RegionInfo> toAssign = handleRIT(env, regionsOnCrashedServer);
|
||||
AssignmentManager am = env.getAssignmentManager();
|
||||
// CreateAssignProcedure will try to use the old location for the region deploy.
|
||||
|
|
|
@ -19,14 +19,12 @@ package org.apache.hadoop.hbase.master.procedure;
|
|||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Procedures that handle servers -- e.g. server crash -- must implement this Interface.
|
||||
* It is used by the procedure runner to figure locking and what queuing.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface ServerProcedureInterface {
|
||||
public enum ServerOperationType {
|
||||
CRASH_HANDLER
|
||||
|
|
|
@ -15,19 +15,16 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Procedures that operates on a specific Table (e.g. create, delete, snapshot, ...)
|
||||
* must implement this interface to allow the system handle the lock/concurrency problems.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface TableProcedureInterface {
|
||||
public enum TableOperationType {
|
||||
CREATE, DELETE, DISABLE, EDIT, ENABLE, READ,
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
|
||||
public class DummyRegionProcedure
|
||||
extends AbstractStateMachineRegionProcedure<DummyRegionProcedureState> {
|
||||
|
||||
private final CountDownLatch arrive = new CountDownLatch(1);
|
||||
|
||||
private final CountDownLatch resume = new CountDownLatch(1);
|
||||
|
||||
public DummyRegionProcedure() {
|
||||
}
|
||||
|
||||
public DummyRegionProcedure(MasterProcedureEnv env, RegionInfo hri) {
|
||||
super(env, hri);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return TableOperationType.REGION_EDIT;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(MasterProcedureEnv env, DummyRegionProcedureState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
arrive.countDown();
|
||||
resume.await();
|
||||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollbackState(MasterProcedureEnv env, DummyRegionProcedureState state)
|
||||
throws IOException, InterruptedException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DummyRegionProcedureState getState(int stateId) {
|
||||
return DummyRegionProcedureState.STATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStateId(DummyRegionProcedureState state) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DummyRegionProcedureState getInitialState() {
|
||||
return DummyRegionProcedureState.STATE;
|
||||
}
|
||||
|
||||
public void waitUntilArrive() throws InterruptedException {
|
||||
arrive.await();
|
||||
}
|
||||
|
||||
public void resume() {
|
||||
resume.countDown();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
public enum DummyRegionProcedureState {
|
||||
STATE
|
||||
}
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
|||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -261,8 +260,8 @@ public class TestMasterNoCluster {
|
|||
|
||||
HMaster master = new HMaster(conf) {
|
||||
@Override
|
||||
MasterMetaBootstrap createMetaBootstrap(final HMaster master, final MonitoredTask status) {
|
||||
return new MasterMetaBootstrap(this, status) {
|
||||
protected MasterMetaBootstrap createMetaBootstrap() {
|
||||
return new MasterMetaBootstrap(this) {
|
||||
@Override
|
||||
protected void assignMetaReplicas()
|
||||
throws IOException, InterruptedException, KeeperException {
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.AsyncAdmin;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MasterTests.class, LargeTests.class })
|
||||
public class TestServerCrashProcedureCarryingMetaStuck {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestServerCrashProcedureCarryingMetaStuck.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.startMiniCluster(3);
|
||||
UTIL.getAdmin().balancerSwitch(false, true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
RegionServerThread rsThread = null;
|
||||
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
if (!t.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty()) {
|
||||
rsThread = t;
|
||||
break;
|
||||
}
|
||||
}
|
||||
HRegionServer rs = rsThread.getRegionServer();
|
||||
RegionInfo hri = rs.getRegions(TableName.META_TABLE_NAME).get(0).getRegionInfo();
|
||||
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
|
||||
ProcedureExecutor<MasterProcedureEnv> executor = master.getMasterProcedureExecutor();
|
||||
DummyRegionProcedure proc = new DummyRegionProcedure(executor.getEnvironment(), hri);
|
||||
long procId = master.getMasterProcedureExecutor().submitProcedure(proc);
|
||||
proc.waitUntilArrive();
|
||||
try (AsyncConnection conn =
|
||||
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get()) {
|
||||
AsyncAdmin admin = conn.getAdmin();
|
||||
CompletableFuture<Void> future = admin.move(hri.getRegionName());
|
||||
rs.abort("For testing!");
|
||||
|
||||
UTIL.waitFor(30000,
|
||||
() -> executor.getProcedures().stream().filter(p -> p instanceof AssignProcedure)
|
||||
.map(p -> (AssignProcedure) p)
|
||||
.anyMatch(p -> Bytes.equals(hri.getRegionName(), p.getRegionInfo().getRegionName())));
|
||||
proc.resume();
|
||||
UTIL.waitFor(30000, () -> executor.isFinished(procId));
|
||||
// see whether the move region procedure can finish properly
|
||||
future.get(30, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,12 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -31,11 +27,8 @@ import org.apache.hadoop.hbase.client.AsyncConnection;
|
|||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
|
@ -63,58 +56,6 @@ public class TestServerCrashProcedureStuck {
|
|||
|
||||
private static byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
private static CountDownLatch ARRIVE = new CountDownLatch(1);
|
||||
|
||||
private static CountDownLatch RESUME = new CountDownLatch(1);
|
||||
|
||||
public enum DummyState {
|
||||
STATE
|
||||
}
|
||||
|
||||
public static final class DummyRegionProcedure
|
||||
extends AbstractStateMachineRegionProcedure<DummyState> {
|
||||
|
||||
public DummyRegionProcedure() {
|
||||
}
|
||||
|
||||
public DummyRegionProcedure(MasterProcedureEnv env, RegionInfo hri) {
|
||||
super(env, hri);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return TableOperationType.REGION_EDIT;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(MasterProcedureEnv env, DummyState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
ARRIVE.countDown();
|
||||
RESUME.await();
|
||||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollbackState(MasterProcedureEnv env, DummyState state)
|
||||
throws IOException, InterruptedException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DummyState getState(int stateId) {
|
||||
return DummyState.STATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStateId(DummyState state) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DummyState getInitialState() {
|
||||
return DummyState.STATE;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.startMiniCluster(3);
|
||||
|
@ -129,8 +70,7 @@ public class TestServerCrashProcedureStuck {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void test()
|
||||
throws IOException, InterruptedException, ExecutionException, TimeoutException {
|
||||
public void test() throws Exception {
|
||||
RegionServerThread rsThread = null;
|
||||
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
if (!t.getRegionServer().getRegions(TABLE_NAME).isEmpty()) {
|
||||
|
@ -144,7 +84,7 @@ public class TestServerCrashProcedureStuck {
|
|||
ProcedureExecutor<MasterProcedureEnv> executor = master.getMasterProcedureExecutor();
|
||||
DummyRegionProcedure proc = new DummyRegionProcedure(executor.getEnvironment(), hri);
|
||||
long procId = master.getMasterProcedureExecutor().submitProcedure(proc);
|
||||
ARRIVE.await();
|
||||
proc.waitUntilArrive();
|
||||
try (AsyncConnection conn =
|
||||
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get()) {
|
||||
AsyncAdmin admin = conn.getAdmin();
|
||||
|
@ -155,7 +95,7 @@ public class TestServerCrashProcedureStuck {
|
|||
() -> executor.getProcedures().stream().filter(p -> p instanceof AssignProcedure)
|
||||
.map(p -> (AssignProcedure) p)
|
||||
.anyMatch(p -> Bytes.equals(hri.getRegionName(), p.getRegionInfo().getRegionName())));
|
||||
RESUME.countDown();
|
||||
proc.resume();
|
||||
UTIL.waitFor(30000, () -> executor.isFinished(procId));
|
||||
// see whether the move region procedure can finish properly
|
||||
future.get(30, TimeUnit.SECONDS);
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.List;
|
|||
import java.util.TreeSet;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -53,7 +52,6 @@ import org.apache.hadoop.hbase.master.MasterMetaBootstrap;
|
|||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
|
@ -98,8 +96,7 @@ public class MasterProcedureTestingUtility {
|
|||
public Void call() throws Exception {
|
||||
final AssignmentManager am = env.getAssignmentManager();
|
||||
am.start();
|
||||
MasterMetaBootstrap metaBootstrap = new MasterMetaBootstrap(master,
|
||||
TaskMonitor.get().createStatus("meta"));
|
||||
MasterMetaBootstrap metaBootstrap = new MasterMetaBootstrap(master);
|
||||
metaBootstrap.recoverMeta();
|
||||
metaBootstrap.processDeadServers();
|
||||
am.joinCluster();
|
||||
|
|
Loading…
Reference in New Issue