HBASE-22074 Should use procedure store to persist the state in reportRegionStateTransition
This commit is contained in:
parent
f8524b8f8e
commit
5f6143ebde
|
@ -3003,28 +3003,32 @@ public final class ProtobufUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Create a CloseRegionRequest for a given region name
|
||||
*
|
||||
* @param regionName the name of the region to close
|
||||
* @return a CloseRegionRequest
|
||||
*/
|
||||
public static CloseRegionRequest buildCloseRegionRequest(ServerName server,
|
||||
final byte[] regionName) {
|
||||
return ProtobufUtil.buildCloseRegionRequest(server, regionName, null);
|
||||
}
|
||||
* Create a CloseRegionRequest for a given region name
|
||||
* @param regionName the name of the region to close
|
||||
* @return a CloseRegionRequest
|
||||
*/
|
||||
public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName) {
|
||||
return ProtobufUtil.buildCloseRegionRequest(server, regionName, null);
|
||||
}
|
||||
|
||||
public static CloseRegionRequest buildCloseRegionRequest(ServerName server,
|
||||
final byte[] regionName, ServerName destinationServer) {
|
||||
public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
|
||||
ServerName destinationServer) {
|
||||
return buildCloseRegionRequest(server, regionName, destinationServer, -1);
|
||||
}
|
||||
|
||||
public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
|
||||
ServerName destinationServer, long closeProcId) {
|
||||
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
|
||||
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
|
||||
RegionSpecifierType.REGION_NAME, regionName);
|
||||
RegionSpecifier region =
|
||||
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
|
||||
builder.setRegion(region);
|
||||
if (destinationServer != null){
|
||||
if (destinationServer != null) {
|
||||
builder.setDestinationServer(toServerName(destinationServer));
|
||||
}
|
||||
if (server != null) {
|
||||
builder.setServerStartCode(server.getStartcode());
|
||||
}
|
||||
builder.setCloseProcId(closeProcId);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -940,27 +940,6 @@ public final class RequestConverter {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer OpenRegionRequest to open a list of regions
|
||||
* @param server the serverName for the RPC
|
||||
* @param regionOpenInfos info of a list of regions to open
|
||||
* @return a protocol buffer OpenRegionRequest
|
||||
*/
|
||||
public static OpenRegionRequest buildOpenRegionRequest(ServerName server,
|
||||
final List<Pair<RegionInfo, List<ServerName>>> regionOpenInfos) {
|
||||
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
|
||||
for (Pair<RegionInfo, List<ServerName>> regionOpenInfo : regionOpenInfos) {
|
||||
builder.addOpenInfo(buildRegionOpenInfo(regionOpenInfo.getFirst(),
|
||||
regionOpenInfo.getSecond()));
|
||||
}
|
||||
if (server != null) {
|
||||
builder.setServerStartCode(server.getStartcode());
|
||||
}
|
||||
// send the master's wall clock time as well, so that the RS can refer to it
|
||||
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer OpenRegionRequest for a given region
|
||||
* @param server the serverName for the RPC
|
||||
|
@ -971,7 +950,7 @@ public final class RequestConverter {
|
|||
public static OpenRegionRequest buildOpenRegionRequest(ServerName server,
|
||||
final RegionInfo region, List<ServerName> favoredNodes) {
|
||||
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
|
||||
builder.addOpenInfo(buildRegionOpenInfo(region, favoredNodes));
|
||||
builder.addOpenInfo(buildRegionOpenInfo(region, favoredNodes, -1L));
|
||||
if (server != null) {
|
||||
builder.setServerStartCode(server.getStartcode());
|
||||
}
|
||||
|
@ -1622,8 +1601,8 @@ public final class RequestConverter {
|
|||
/**
|
||||
* Create a RegionOpenInfo based on given region info and version of offline node
|
||||
*/
|
||||
public static RegionOpenInfo buildRegionOpenInfo(
|
||||
final RegionInfo region, final List<ServerName> favoredNodes) {
|
||||
public static RegionOpenInfo buildRegionOpenInfo(RegionInfo region, List<ServerName> favoredNodes,
|
||||
long openProcId) {
|
||||
RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();
|
||||
builder.setRegion(ProtobufUtil.toRegionInfo(region));
|
||||
if (favoredNodes != null) {
|
||||
|
@ -1631,6 +1610,7 @@ public final class RequestConverter {
|
|||
builder.addFavoredNodes(ProtobufUtil.toServerName(server));
|
||||
}
|
||||
}
|
||||
builder.setOpenProcId(openProcId);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -88,6 +88,7 @@ message OpenRegionRequest {
|
|||
repeated ServerName favored_nodes = 3;
|
||||
// open region for distributedLogReplay
|
||||
// optional bool DEPRECATED_openForDistributedLogReplay = 4;
|
||||
optional int64 open_proc_id = 5 [default = -1];
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,7 +103,6 @@ message OpenRegionResponse {
|
|||
}
|
||||
|
||||
message WarmupRegionRequest {
|
||||
|
||||
required RegionInfo regionInfo = 1;
|
||||
}
|
||||
|
||||
|
@ -120,6 +120,7 @@ message CloseRegionRequest {
|
|||
optional ServerName destination_server = 4;
|
||||
// the intended server for this RPC.
|
||||
optional uint64 serverStartCode = 5;
|
||||
optional int64 close_proc_id = 6 [default = -1];
|
||||
}
|
||||
|
||||
message CloseRegionResponse {
|
||||
|
|
|
@ -28,6 +28,7 @@ import "HBase.proto";
|
|||
import "RPC.proto";
|
||||
import "Snapshot.proto";
|
||||
import "Replication.proto";
|
||||
import "RegionServerStatus.proto";
|
||||
|
||||
// ============================================================================
|
||||
// WARNING - Compatibility rules
|
||||
|
@ -548,9 +549,19 @@ message RegionStateTransitionStateData {
|
|||
required bool force_new_plan = 3;
|
||||
}
|
||||
|
||||
enum RegionRemoteProcedureBaseState {
|
||||
REGION_REMOTE_PROCEDURE_DISPATCH = 1;
|
||||
REGION_REMOTE_PROCEDURE_REPORT_SUCCEED = 2;
|
||||
REGION_REMOTE_PROCEDURE_DISPATCH_FAIL = 3;
|
||||
REGION_REMOTE_PROCEDURE_SERVER_CRASH = 4;
|
||||
}
|
||||
|
||||
message RegionRemoteProcedureBaseStateData {
|
||||
required RegionInfo region = 1;
|
||||
required ServerName target_server = 2;
|
||||
required RegionRemoteProcedureBaseState state = 3;
|
||||
optional RegionStateTransition.TransitionCode transition_code = 4;
|
||||
optional int64 seq_id = 5;
|
||||
}
|
||||
|
||||
message OpenRegionProcedureStateData {
|
||||
|
|
|
@ -96,6 +96,7 @@ message RegionStateTransition {
|
|||
/** For newly opened region, the open seq num is needed */
|
||||
optional uint64 open_seq_num = 3;
|
||||
|
||||
repeated int64 proc_id = 4;
|
||||
enum TransitionCode {
|
||||
OPENED = 0;
|
||||
FAILED_OPEN = 1;
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
|
@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
|||
* @deprecated Do not use any more.
|
||||
* @see TransitRegionStateProcedure
|
||||
*/
|
||||
// TODO: Add being able to assign a region to open read-only.
|
||||
@Deprecated
|
||||
@InterfaceAudience.Private
|
||||
public class AssignProcedure extends RegionTransitionProcedure {
|
||||
|
@ -121,9 +119,7 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
|||
@Override
|
||||
public RemoteOperation remoteCallBuild(final MasterProcedureEnv env,
|
||||
final ServerName serverName) {
|
||||
assert serverName.equals(getRegionState(env).getRegionLocation());
|
||||
return new RegionOpenOperation(this, getRegionInfo(),
|
||||
env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -835,8 +835,10 @@ public class AssignmentManager {
|
|||
case CLOSED:
|
||||
assert transition.getRegionInfoCount() == 1 : transition;
|
||||
final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
|
||||
long procId =
|
||||
transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
|
||||
updateRegionTransition(serverName, transition.getTransitionCode(), hri,
|
||||
transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM);
|
||||
transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
|
||||
break;
|
||||
case READY_TO_SPLIT:
|
||||
case SPLIT:
|
||||
|
@ -903,7 +905,7 @@ public class AssignmentManager {
|
|||
}
|
||||
|
||||
private void updateRegionTransition(ServerName serverName, TransitionCode state,
|
||||
RegionInfo regionInfo, long seqId) throws IOException {
|
||||
RegionInfo regionInfo, long seqId, long procId) throws IOException {
|
||||
checkMetaLoaded(regionInfo);
|
||||
|
||||
RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
|
||||
|
@ -919,7 +921,7 @@ public class AssignmentManager {
|
|||
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
|
||||
regionNode.lock();
|
||||
try {
|
||||
if (!reportTransition(regionNode, serverNode, state, seqId)) {
|
||||
if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
|
||||
// Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
|
||||
// 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
|
||||
// rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
|
||||
|
@ -941,14 +943,14 @@ public class AssignmentManager {
|
|||
}
|
||||
|
||||
private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
|
||||
TransitionCode state, long seqId) throws IOException {
|
||||
TransitionCode state, long seqId, long procId) throws IOException {
|
||||
ServerName serverName = serverNode.getServerName();
|
||||
TransitRegionStateProcedure proc = regionNode.getProcedure();
|
||||
if (proc == null) {
|
||||
return false;
|
||||
}
|
||||
proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
|
||||
serverName, state, seqId);
|
||||
serverName, state, seqId, procId);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,15 +20,17 @@ package org.apache.hadoop.hbase.master.assignment;
|
|||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseRegionProcedureStateData;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
|
||||
/**
|
||||
* The remote procedure used to close a region.
|
||||
|
@ -46,9 +48,9 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
|||
super();
|
||||
}
|
||||
|
||||
public CloseRegionProcedure(RegionInfo region, ServerName targetServer,
|
||||
ServerName assignCandidate) {
|
||||
super(region, targetServer);
|
||||
public CloseRegionProcedure(TransitRegionStateProcedure parent, RegionInfo region,
|
||||
ServerName targetServer, ServerName assignCandidate) {
|
||||
super(parent, region, targetServer);
|
||||
this.assignCandidate = assignCandidate;
|
||||
}
|
||||
|
||||
|
@ -59,7 +61,7 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
|||
|
||||
@Override
|
||||
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
|
||||
return new RegionCloseOperation(this, region, assignCandidate);
|
||||
return new RegionCloseOperation(this, region, getProcId(), assignCandidate);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -88,7 +90,17 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldDispatch(RegionStateNode regionNode) {
|
||||
return regionNode.isInState(RegionState.State.CLOSING);
|
||||
protected void reportTransition(RegionStateNode regionNode, TransitionCode transitionCode,
|
||||
long seqId) throws IOException {
|
||||
if (transitionCode != TransitionCode.CLOSED) {
|
||||
throw new UnexpectedStateException("Received report unexpected " + transitionCode +
|
||||
" transition, " + regionNode.toShortString() + ", " + this + ", expected CLOSED.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateTransition(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
TransitionCode transitionCode, long seqId) throws IOException {
|
||||
env.getAssignmentManager().regionClosed(regionNode, true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,15 +20,18 @@ package org.apache.hadoop.hbase.master.assignment;
|
|||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.OpenRegionProcedureStateData;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
|
||||
/**
|
||||
* The remote procedure used to open a region.
|
||||
|
@ -36,12 +39,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.O
|
|||
@InterfaceAudience.Private
|
||||
public class OpenRegionProcedure extends RegionRemoteProcedureBase {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(OpenRegionProcedure.class);
|
||||
|
||||
public OpenRegionProcedure() {
|
||||
super();
|
||||
}
|
||||
|
||||
public OpenRegionProcedure(RegionInfo region, ServerName targetServer) {
|
||||
super(region, targetServer);
|
||||
public OpenRegionProcedure(TransitRegionStateProcedure parent, RegionInfo region,
|
||||
ServerName targetServer) {
|
||||
super(parent, region, targetServer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -51,8 +57,7 @@ public class OpenRegionProcedure extends RegionRemoteProcedureBase {
|
|||
|
||||
@Override
|
||||
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
|
||||
return new RegionOpenOperation(this, region, env.getAssignmentManager().getFavoredNodes(region),
|
||||
false);
|
||||
return new RegionOpenOperation(this, region, getProcId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -73,7 +78,48 @@ public class OpenRegionProcedure extends RegionRemoteProcedureBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldDispatch(RegionStateNode regionNode) {
|
||||
return regionNode.isInState(RegionState.State.OPENING);
|
||||
protected void reportTransition(RegionStateNode regionNode, TransitionCode transitionCode,
|
||||
long seqId) throws IOException {
|
||||
switch (transitionCode) {
|
||||
case OPENED:
|
||||
// this is the openSeqNum
|
||||
if (seqId < 0) {
|
||||
throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED +
|
||||
" transition openSeqNum=" + seqId + ", " + regionNode + ", proc=" + this);
|
||||
}
|
||||
break;
|
||||
case FAILED_OPEN:
|
||||
// nothing to check
|
||||
break;
|
||||
default:
|
||||
throw new UnexpectedStateException(
|
||||
"Received report unexpected " + transitionCode + " transition, " +
|
||||
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateTransition(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
TransitionCode transitionCode, long openSeqNum) throws IOException {
|
||||
switch (transitionCode) {
|
||||
case OPENED:
|
||||
if (openSeqNum < regionNode.getOpenSeqNum()) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={} but the new openSeqNum {}" +
|
||||
" is less than the current one {}, ignoring...",
|
||||
transitionCode, targetServer, regionNode, getProcId(), openSeqNum,
|
||||
regionNode.getOpenSeqNum());
|
||||
} else {
|
||||
regionNode.setOpenSeqNum(openSeqNum);
|
||||
}
|
||||
env.getAssignmentManager().regionOpened(regionNode);
|
||||
break;
|
||||
case FAILED_OPEN:
|
||||
env.getAssignmentManager().regionFailedOpen(regionNode, false);
|
||||
break;
|
||||
default:
|
||||
throw new UnexpectedStateException("Unexpected transition code: " + transitionCode);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,11 @@
|
|||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
|
||||
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
|
||||
|
@ -28,6 +30,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||
|
@ -36,7 +39,10 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseStateData;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
|
||||
/**
|
||||
* The base class for the remote procedures used to open/close a region.
|
||||
|
@ -53,16 +59,25 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
|
||||
protected RegionInfo region;
|
||||
|
||||
private ServerName targetServer;
|
||||
protected ServerName targetServer;
|
||||
|
||||
private boolean dispatched;
|
||||
private RegionRemoteProcedureBaseState state =
|
||||
RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
|
||||
|
||||
private TransitionCode transitionCode;
|
||||
|
||||
private long seqId;
|
||||
|
||||
private int attempt;
|
||||
|
||||
protected RegionRemoteProcedureBase() {
|
||||
}
|
||||
|
||||
protected RegionRemoteProcedureBase(RegionInfo region, ServerName targetServer) {
|
||||
protected RegionRemoteProcedureBase(TransitRegionStateProcedure parent, RegionInfo region,
|
||||
ServerName targetServer) {
|
||||
this.region = region;
|
||||
this.targetServer = targetServer;
|
||||
parent.attachRemoteProc(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -86,22 +101,26 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
RegionStateNode regionNode = getRegionNode(env);
|
||||
regionNode.lock();
|
||||
try {
|
||||
LOG.warn("The remote operation {} for region {} to server {} failed", this, region,
|
||||
targetServer, exception);
|
||||
// This could happen as the RSProcedureDispatcher and dead server processor are executed in
|
||||
// different threads. It is possible that we have already scheduled SCP for the targetServer
|
||||
// and woken up this procedure, and assigned the region to another RS, and then the
|
||||
// RSProcedureDispatcher notices that the targetServer is dead so it can not send the request
|
||||
// out and call remoteCallFailed, which makes us arrive here, especially that if the target
|
||||
// machine is completely down, which means you can only receive a ConnectionTimeout after a
|
||||
// very long time(depends on the timeout settings and in HBase usually it will be at least 15
|
||||
// seconds, or even 1 minute). So here we need to check whether we are stilling waiting on the
|
||||
// given event, if not, this means that we have already been woken up so do not wake it up
|
||||
// again.
|
||||
if (!regionNode.getProcedureEvent().wakeIfSuspended(env.getProcedureScheduler(), this)) {
|
||||
LOG.warn("{} is not waiting on the event for region {}, targer server = {}, ignore.", this,
|
||||
region, targetServer);
|
||||
if (!env.getMasterServices().getServerManager().isServerOnline(remote)) {
|
||||
// the SCP will interrupt us, give up
|
||||
LOG.debug("{} for region {}, targetServer {} is dead, SCP will interrupt us, give up", this,
|
||||
regionNode, remote);
|
||||
return;
|
||||
}
|
||||
if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
|
||||
// not sure how can this happen but anyway let's add a check here to avoid waking the wrong
|
||||
// procedure...
|
||||
LOG.warn("{} for region {}, targetServer={} has already been woken up, ignore", this,
|
||||
regionNode, remote);
|
||||
return;
|
||||
}
|
||||
LOG.warn("The remote operation {} for region {} to server {} failed", this, regionNode,
|
||||
remote, exception);
|
||||
// It is OK to not persist the state here, as we do not need to change the region state if the
|
||||
// remote call is failed. If the master crashed before we actually execute the procedure and
|
||||
// persist the new state, it is fine to retry on the same target server again.
|
||||
state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH_FAIL;
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
} finally {
|
||||
regionNode.unlock();
|
||||
}
|
||||
|
@ -133,46 +152,127 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether we still need to make the call to RS.
|
||||
* <p/>
|
||||
* This could happen when master restarts. Since we do not know whether a request has already been
|
||||
* sent to the region server after we add a remote operation to the dispatcher, so the safe way is
|
||||
* to not persist the dispatched field and try to add the remote operation again. But it is
|
||||
* possible that we do have already sent the request to region server and it has also sent back
|
||||
* the response, so here we need to check the region state, if it is not in the expecting state,
|
||||
* we should give up, otherwise we may hang for ever, as the region server will just ignore
|
||||
* redundant calls.
|
||||
*/
|
||||
protected abstract boolean shouldDispatch(RegionStateNode regionNode);
|
||||
// do some checks to see if the report is valid, without actually updating meta.
|
||||
protected abstract void reportTransition(RegionStateNode regionNode,
|
||||
TransitionCode transitionCode, long seqId) throws IOException;
|
||||
|
||||
// A bit strange but the procedure store will throw RuntimeException if we can not persist the
|
||||
// state, so upper layer should take care of this...
|
||||
private void persistAndWake(MasterProcedureEnv env, RegionStateNode regionNode) {
|
||||
env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
}
|
||||
|
||||
// should be called with RegionStateNode locked, to avoid race with the execute method below
|
||||
void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName,
|
||||
TransitionCode transitionCode, long seqId) throws IOException {
|
||||
if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
|
||||
// should be a retry
|
||||
return;
|
||||
}
|
||||
if (!targetServer.equals(serverName)) {
|
||||
throw new UnexpectedStateException("Received report from " + serverName + ", expected " +
|
||||
targetServer + ", " + regionNode + ", proc=" + this);
|
||||
}
|
||||
reportTransition(regionNode, transitionCode, seqId);
|
||||
// this state means we have received the report from RS, does not mean the result is fine, as we
|
||||
// may received a FAILED_OPEN.
|
||||
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED;
|
||||
this.transitionCode = transitionCode;
|
||||
this.seqId = seqId;
|
||||
// Persist the transition code and openSeqNum(if provided).
|
||||
// We should not update the hbase:meta directly as this may cause races when master restarts,
|
||||
// as the old active master may incorrectly report back to RS and cause the new master to hang
|
||||
// on a OpenRegionProcedure forever. See HBASE-22060 and HBASE-22074 for more details.
|
||||
boolean succ = false;
|
||||
try {
|
||||
persistAndWake(env, regionNode);
|
||||
succ = true;
|
||||
} finally {
|
||||
if (!succ) {
|
||||
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
|
||||
this.transitionCode = null;
|
||||
this.seqId = HConstants.NO_SEQNUM;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName) {
|
||||
if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
|
||||
// should be a retry
|
||||
return;
|
||||
}
|
||||
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH;
|
||||
boolean succ = false;
|
||||
try {
|
||||
persistAndWake(env, regionNode);
|
||||
succ = true;
|
||||
} finally {
|
||||
if (!succ) {
|
||||
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private TransitRegionStateProcedure getParent(MasterProcedureEnv env) {
|
||||
return (TransitRegionStateProcedure) env.getMasterServices().getMasterProcedureExecutor()
|
||||
.getProcedure(getParentProcId());
|
||||
}
|
||||
|
||||
private void unattach(MasterProcedureEnv env) {
|
||||
getParent(env).unattachRemoteProc(this);
|
||||
}
|
||||
|
||||
// actually update the state to meta
|
||||
protected abstract void updateTransition(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
TransitionCode transitionCode, long seqId) throws IOException;
|
||||
|
||||
@Override
|
||||
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
if (dispatched) {
|
||||
// we are done, the parent procedure will check whether we are succeeded.
|
||||
return null;
|
||||
}
|
||||
RegionStateNode regionNode = getRegionNode(env);
|
||||
regionNode.lock();
|
||||
try {
|
||||
if (!shouldDispatch(regionNode)) {
|
||||
return null;
|
||||
switch (state) {
|
||||
case REGION_REMOTE_PROCEDURE_DISPATCH: {
|
||||
// The code which wakes us up also needs to lock the RSN so here we do not need to
|
||||
// synchronize
|
||||
// on the event.
|
||||
ProcedureEvent<?> event = regionNode.getProcedureEvent();
|
||||
try {
|
||||
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
|
||||
} catch (FailedRemoteDispatchException e) {
|
||||
LOG.warn("Can not add remote operation {} for region {} to server {}, this usually " +
|
||||
"because the server is alread dead, give up and mark the procedure as complete, " +
|
||||
"the parent procedure will take care of this.", this, region, targetServer, e);
|
||||
unattach(env);
|
||||
return null;
|
||||
}
|
||||
event.suspend();
|
||||
event.suspendIfNotReady(this);
|
||||
throw new ProcedureSuspendedException();
|
||||
}
|
||||
case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
|
||||
updateTransition(env, regionNode, transitionCode, seqId);
|
||||
unattach(env);
|
||||
return null;
|
||||
case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
|
||||
// the remote call is failed so we do not need to change the region state, just return.
|
||||
unattach(env);
|
||||
return null;
|
||||
case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
|
||||
env.getAssignmentManager().regionClosed(regionNode, false);
|
||||
unattach(env);
|
||||
return null;
|
||||
default:
|
||||
throw new IllegalStateException("Unknown state: " + state);
|
||||
}
|
||||
// The code which wakes us up also needs to lock the RSN so here we do not need to synchronize
|
||||
// on the event.
|
||||
ProcedureEvent<?> event = regionNode.getProcedureEvent();
|
||||
try {
|
||||
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
|
||||
} catch (FailedRemoteDispatchException e) {
|
||||
LOG.warn("Can not add remote operation {} for region {} to server {}, this usually " +
|
||||
"because the server is alread dead, give up and mark the procedure as complete, " +
|
||||
"the parent procedure will take care of this.", this, region, targetServer, e);
|
||||
return null;
|
||||
}
|
||||
dispatched = true;
|
||||
event.suspend();
|
||||
event.suspendIfNotReady(this);
|
||||
} catch (IOException e) {
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
|
||||
LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
|
||||
setTimeout(Math.toIntExact(backoff));
|
||||
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||
skipPersistence();
|
||||
throw new ProcedureSuspendedException();
|
||||
} finally {
|
||||
regionNode.unlock();
|
||||
|
@ -186,9 +286,14 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
serializer.serialize(
|
||||
RegionRemoteProcedureBaseStateData.Builder builder =
|
||||
RegionRemoteProcedureBaseStateData.newBuilder().setRegion(ProtobufUtil.toRegionInfo(region))
|
||||
.setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
|
||||
.setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state);
|
||||
if (transitionCode != null) {
|
||||
builder.setTransitionCode(transitionCode);
|
||||
builder.setSeqId(seqId);
|
||||
}
|
||||
serializer.serialize(builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -197,5 +302,15 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
|||
serializer.deserialize(RegionRemoteProcedureBaseStateData.class);
|
||||
region = ProtobufUtil.toRegionInfo(data.getRegion());
|
||||
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
|
||||
state = data.getState();
|
||||
if (data.hasTransitionCode()) {
|
||||
transitionCode = data.getTransitionCode();
|
||||
seqId = data.getSeqId();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterReplay(MasterProcedureEnv env) {
|
||||
getParent(env).attachRemoteProc(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,9 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED;
|
||||
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
|
@ -28,7 +25,6 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
|
||||
|
@ -121,6 +117,8 @@ public class TransitRegionStateProcedure
|
|||
|
||||
private int attempt;
|
||||
|
||||
private RegionRemoteProcedureBase remoteProc;
|
||||
|
||||
public TransitRegionStateProcedure() {
|
||||
}
|
||||
|
||||
|
@ -143,6 +141,7 @@ public class TransitRegionStateProcedure
|
|||
throw new IllegalArgumentException("Unknown TransitionType: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
|
||||
ServerName assignCandidate, boolean forceNewPlan, TransitionType type) {
|
||||
|
@ -204,21 +203,18 @@ public class TransitRegionStateProcedure
|
|||
return;
|
||||
}
|
||||
env.getAssignmentManager().regionOpening(regionNode);
|
||||
addChildProcedure(new OpenRegionProcedure(getRegion(), loc));
|
||||
addChildProcedure(new OpenRegionProcedure(this, getRegion(), loc));
|
||||
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED);
|
||||
}
|
||||
|
||||
private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode)
|
||||
throws IOException {
|
||||
// notice that, for normal case, if we successfully opened a region, we will not arrive here, as
|
||||
// in reportTransition we will call unsetProcedure, and in executeFromState we will return
|
||||
// directly. But if the master is crashed before we finish the procedure, then next time we will
|
||||
// arrive here. So we still need to add code for normal cases.
|
||||
if (regionNode.isInState(State.OPEN)) {
|
||||
attempt = 0;
|
||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
||||
// we are the last state, finish
|
||||
regionNode.unsetProcedure(this);
|
||||
ServerCrashProcedure.updateProgress(env, getParentProcId());
|
||||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
// It is possible that we arrive here but confirm opened is not the last state, for example,
|
||||
|
@ -250,8 +246,8 @@ public class TransitRegionStateProcedure
|
|||
if (regionNode.isInState(State.OPEN, State.CLOSING, State.MERGING, State.SPLITTING)) {
|
||||
// this is the normal case
|
||||
env.getAssignmentManager().regionClosing(regionNode);
|
||||
addChildProcedure(
|
||||
new CloseRegionProcedure(getRegion(), regionNode.getRegionLocation(), assignCandidate));
|
||||
addChildProcedure(new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
|
||||
assignCandidate));
|
||||
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED);
|
||||
} else {
|
||||
forceNewPlan = true;
|
||||
|
@ -262,10 +258,6 @@ public class TransitRegionStateProcedure
|
|||
|
||||
private Flow confirmClosed(MasterProcedureEnv env, RegionStateNode regionNode)
|
||||
throws IOException {
|
||||
// notice that, for normal case, if we successfully opened a region, we will not arrive here, as
|
||||
// in reportTransition we will call unsetProcedure, and in executeFromState we will return
|
||||
// directly. But if the master is crashed before we finish the procedure, then next time we will
|
||||
// arrive here. So we still need to add code for normal cases.
|
||||
if (regionNode.isInState(State.CLOSED)) {
|
||||
attempt = 0;
|
||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
|
||||
|
@ -324,14 +316,6 @@ public class TransitRegionStateProcedure
|
|||
protected Flow executeFromState(MasterProcedureEnv env, RegionStateTransitionState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
RegionStateNode regionNode = getRegionStateNode(env);
|
||||
if (regionNode.getProcedure() != this) {
|
||||
// This is possible, and is the normal case, as we will call unsetProcedure in
|
||||
// reportTransition, this means we have already done
|
||||
// This is because that, when we mark the region as OPENED or CLOSED, then all the works
|
||||
// should have already been done, and logically we could have another TRSP scheduled for this
|
||||
// region immediately(think of a RS crash at the point...).
|
||||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
try {
|
||||
switch (state) {
|
||||
case REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE:
|
||||
|
@ -373,173 +357,49 @@ public class TransitRegionStateProcedure
|
|||
return false; // 'false' means that this procedure handled the timeout
|
||||
}
|
||||
|
||||
private boolean isOpening(RegionStateNode regionNode, ServerName serverName,
|
||||
TransitionCode code) {
|
||||
if (!regionNode.isInState(State.OPENING)) {
|
||||
LOG.warn("Received report {} transition from {} for {}, pid={}, but the region is not in" +
|
||||
" OPENING state, should be a retry, ignore", code, serverName, regionNode, getProcId());
|
||||
return false;
|
||||
}
|
||||
if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={}," +
|
||||
" but the TRSP is not in {} state, should be a retry, ignore",
|
||||
code, serverName, regionNode, getProcId(), REGION_STATE_TRANSITION_CONFIRM_OPENED);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void reportTransitionOpen(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
ServerName serverName, long openSeqNum) throws IOException {
|
||||
if (!isOpening(regionNode, serverName, TransitionCode.OPENED)) {
|
||||
return;
|
||||
}
|
||||
if (openSeqNum < 0) {
|
||||
throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED +
|
||||
" transition openSeqNum=" + openSeqNum + ", " + regionNode + ", proc=" + this);
|
||||
}
|
||||
if (openSeqNum < regionNode.getOpenSeqNum()) {
|
||||
// use the openSeqNum as a fence, if this is not a retry, then the openSeqNum should be
|
||||
// greater than or equal to the existing one.
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={} but the new openSeqNum {}" +
|
||||
" is less than the current one {}, should be a retry, ignore",
|
||||
TransitionCode.OPENED, serverName, regionNode, getProcId(), openSeqNum,
|
||||
regionNode.getOpenSeqNum());
|
||||
return;
|
||||
}
|
||||
// notice that it is possible for a region to still have the same openSeqNum if it crashes and
|
||||
// we haven't written anything into it. That's why we can not just change the above condition
|
||||
// from '<' to '<='. So here we still need to check whether the serverName
|
||||
// matches, to determine whether this is a retry when the openSeqNum is not changed.
|
||||
if (!regionNode.getRegionLocation().equals(serverName)) {
|
||||
LOG.warn("Received report {} transition from {} for {}, pid={} but the region is not on it," +
|
||||
" should be a retry, ignore", TransitionCode.OPENED, serverName, regionNode, getProcId());
|
||||
return;
|
||||
}
|
||||
regionNode.setOpenSeqNum(openSeqNum);
|
||||
env.getAssignmentManager().regionOpened(regionNode);
|
||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
|
||||
// if parent procedure is ServerCrashProcedure, update progress
|
||||
ServerCrashProcedure.updateProgress(env, getParentProcId());
|
||||
// we are done
|
||||
regionNode.unsetProcedure(this);
|
||||
}
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
}
|
||||
|
||||
private void reportTransitionFailedOpen(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
ServerName serverName) {
|
||||
if (!isOpening(regionNode, serverName, TransitionCode.FAILED_OPEN)) {
|
||||
return;
|
||||
}
|
||||
// there is no openSeqNum for FAILED_OPEN, so we will check the target server instead
|
||||
if (!regionNode.getRegionLocation().equals(serverName)) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={}," +
|
||||
" but the region is not on it, should be a retry, ignore",
|
||||
TransitionCode.FAILED_OPEN, regionNode, serverName, getProcId());
|
||||
return;
|
||||
}
|
||||
// just wake up the procedure and see if we can retry
|
||||
// Notice that, even if we arrive here, this call could still be a retry, as we may retry
|
||||
// opening on the same server again. And the assumption here is that, once the region state is
|
||||
// OPENING, and the TRSP state is REGION_STATE_TRANSITION_CONFIRM_OPENED, the TRSP must have
|
||||
// been suspended on the procedure event, so after the waking operation here, the TRSP will be
|
||||
// executed and try to schedule new OpenRegionProcedure again. Once there is a successful open
|
||||
// then we are done, so the TRSP will not be stuck.
|
||||
// TODO: maybe we could send the procedure id of the OpenRegionProcedure to the region server
|
||||
// and let the region server send it back when done, so it will be easy to detect whether this
|
||||
// is a retry.
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
}
|
||||
|
||||
// we do not need seqId for closing a region
|
||||
private void reportTransitionClosed(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
ServerName serverName) throws IOException {
|
||||
if (!regionNode.isInState(State.CLOSING)) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={}" +
|
||||
", but the region is not in CLOSING state, should be a retry, ignore",
|
||||
TransitionCode.CLOSED, serverName, regionNode, getProcId());
|
||||
return;
|
||||
}
|
||||
if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={} but the proc is not in {}" +
|
||||
" state, should be a retry, ignore",
|
||||
TransitionCode.CLOSED, serverName, regionNode, getProcId(),
|
||||
REGION_STATE_TRANSITION_CONFIRM_CLOSED);
|
||||
return;
|
||||
}
|
||||
if (!regionNode.getRegionLocation().equals(serverName)) {
|
||||
LOG.warn(
|
||||
"Received report {} transition from {} for {}, pid={}," +
|
||||
" but the region is not on it, should be a retry, ignore",
|
||||
TransitionCode.CLOSED, serverName, regionNode, getProcId());
|
||||
return;
|
||||
}
|
||||
env.getAssignmentManager().regionClosed(regionNode, true);
|
||||
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
|
||||
// we are done
|
||||
regionNode.unsetProcedure(this);
|
||||
}
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
}
|
||||
|
||||
// Should be called with RegionStateNode locked
|
||||
public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
ServerName serverName, TransitionCode code, long seqId) throws IOException {
|
||||
// It is possible that the previous reportRegionStateTransition call was succeeded at master
|
||||
// side, but before returning the result to region server, the rpc connection was broken, or the
|
||||
// master restarted. The region server will try calling reportRegionStateTransition again under
|
||||
// this scenario, so here we need to check whether this is a retry.
|
||||
switch (code) {
|
||||
case OPENED:
|
||||
reportTransitionOpen(env, regionNode, serverName, seqId);
|
||||
break;
|
||||
case FAILED_OPEN:
|
||||
reportTransitionFailedOpen(env, regionNode, serverName);
|
||||
break;
|
||||
case CLOSED:
|
||||
reportTransitionClosed(env, regionNode, serverName);
|
||||
break;
|
||||
default:
|
||||
throw new UnexpectedStateException("Received report unexpected " + code + " transition, " +
|
||||
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN or CLOSED.");
|
||||
ServerName serverName, TransitionCode code, long seqId, long procId) throws IOException {
|
||||
if (remoteProc == null) {
|
||||
LOG.warn(
|
||||
"There is no outstanding remote region procedure for {}, serverName={}, code={}," +
|
||||
" seqId={}, proc={}, should be a retry, ignore",
|
||||
regionNode, serverName, code, seqId, this);
|
||||
return;
|
||||
}
|
||||
// The procId could be -1 if it is from an old region server, we need to deal with it so that we
|
||||
// can do rolling upgraing.
|
||||
if (procId >= 0 && remoteProc.getProcId() != procId) {
|
||||
LOG.warn(
|
||||
"The pid of remote region procedure for {} is {}, the reported pid={}, serverName={}," +
|
||||
" code={}, seqId={}, proc={}, should be a retry, ignore",
|
||||
regionNode, remoteProc.getProcId(), procId, serverName, code, seqId, this);
|
||||
return;
|
||||
}
|
||||
remoteProc.reportTransition(env, regionNode, serverName, code, seqId);
|
||||
}
|
||||
|
||||
// Should be called with RegionStateNode locked
|
||||
public void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode,
|
||||
ServerName serverName) throws IOException {
|
||||
// Notice that, in this method, we do not change the procedure state, instead, we update the
|
||||
// region state in hbase:meta. This is because that, the procedure state change will not be
|
||||
// persisted until the region is woken up and finish one step, if we crash before that then the
|
||||
// information will be lost. So here we will update the region state in hbase:meta, and when the
|
||||
// procedure is woken up, it will process the error and jump to the correct procedure state.
|
||||
RegionStateTransitionState currentState = getCurrentState();
|
||||
switch (currentState) {
|
||||
case REGION_STATE_TRANSITION_CLOSE:
|
||||
case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
|
||||
case REGION_STATE_TRANSITION_CONFIRM_OPENED:
|
||||
// for these 3 states, the region may still be online on the crashed server
|
||||
env.getAssignmentManager().regionClosed(regionNode, false);
|
||||
if (currentState != RegionStateTransitionState.REGION_STATE_TRANSITION_CLOSE) {
|
||||
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// If the procedure is in other 2 states, then actually we should not arrive here, as we
|
||||
// know that the region is not online on any server, so we need to do nothing... But anyway
|
||||
// let's add a log here
|
||||
LOG.warn("{} received unexpected server crash call for region {} from {}", this, regionNode,
|
||||
serverName);
|
||||
|
||||
if (remoteProc != null) {
|
||||
// this means we are waiting for the sub procedure, so wake it up
|
||||
remoteProc.serverCrashed(env, regionNode, serverName);
|
||||
} else {
|
||||
// we are in RUNNING state, just update the region state, and we will process it later.
|
||||
env.getAssignmentManager().regionClosed(regionNode, false);
|
||||
}
|
||||
}
|
||||
|
||||
void attachRemoteProc(RegionRemoteProcedureBase proc) {
|
||||
this.remoteProc = proc;
|
||||
}
|
||||
|
||||
void unattachRemoteProc(RegionRemoteProcedureBase proc) {
|
||||
assert this.remoteProc == proc;
|
||||
this.remoteProc = null;
|
||||
}
|
||||
|
||||
private boolean incrementAndCheckMaxAttempts(MasterProcedureEnv env, RegionStateNode regionNode) {
|
||||
int retries = env.getAssignmentManager().getRegionStates().addToFailedOpen(regionNode)
|
||||
.incrementAndGetRetries();
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
|
||||
|
@ -129,8 +128,7 @@ public class UnassignProcedure extends RegionTransitionProcedure {
|
|||
@Override
|
||||
public RemoteOperation remoteCallBuild(final MasterProcedureEnv env,
|
||||
final ServerName serverName) {
|
||||
assert serverName.equals(getRegionState(env).getRegionLocation());
|
||||
return new RegionCloseOperation(this, getRegionInfo(), this.destinationServer);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -399,54 +399,36 @@ public class RSProcedureDispatcher
|
|||
}
|
||||
|
||||
public static abstract class RegionOperation extends RemoteOperation {
|
||||
private final RegionInfo regionInfo;
|
||||
protected final RegionInfo regionInfo;
|
||||
protected final long procId;
|
||||
|
||||
protected RegionOperation(final RemoteProcedure remoteProcedure,
|
||||
final RegionInfo regionInfo) {
|
||||
protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
|
||||
super(remoteProcedure);
|
||||
this.regionInfo = regionInfo;
|
||||
}
|
||||
|
||||
public RegionInfo getRegionInfo() {
|
||||
return this.regionInfo;
|
||||
this.procId = procId;
|
||||
}
|
||||
}
|
||||
|
||||
public static class RegionOpenOperation extends RegionOperation {
|
||||
private final List<ServerName> favoredNodes;
|
||||
private final boolean openForReplay;
|
||||
private boolean failedOpen;
|
||||
|
||||
public RegionOpenOperation(final RemoteProcedure remoteProcedure,
|
||||
final RegionInfo regionInfo, final List<ServerName> favoredNodes,
|
||||
final boolean openForReplay) {
|
||||
super(remoteProcedure, regionInfo);
|
||||
this.favoredNodes = favoredNodes;
|
||||
this.openForReplay = openForReplay;
|
||||
}
|
||||
|
||||
protected void setFailedOpen(final boolean failedOpen) {
|
||||
this.failedOpen = failedOpen;
|
||||
}
|
||||
|
||||
public boolean isFailedOpen() {
|
||||
return failedOpen;
|
||||
public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo,
|
||||
long procId) {
|
||||
super(remoteProcedure, regionInfo, procId);
|
||||
}
|
||||
|
||||
public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
|
||||
final MasterProcedureEnv env) {
|
||||
return RequestConverter.buildRegionOpenInfo(getRegionInfo(),
|
||||
env.getAssignmentManager().getFavoredNodes(getRegionInfo()));
|
||||
return RequestConverter.buildRegionOpenInfo(regionInfo,
|
||||
env.getAssignmentManager().getFavoredNodes(regionInfo), procId);
|
||||
}
|
||||
}
|
||||
|
||||
public static class RegionCloseOperation extends RegionOperation {
|
||||
private final ServerName destinationServer;
|
||||
private boolean closed = false;
|
||||
|
||||
public RegionCloseOperation(final RemoteProcedure remoteProcedure,
|
||||
final RegionInfo regionInfo, final ServerName destinationServer) {
|
||||
super(remoteProcedure, regionInfo);
|
||||
public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
|
||||
ServerName destinationServer) {
|
||||
super(remoteProcedure, regionInfo, procId);
|
||||
this.destinationServer = destinationServer;
|
||||
}
|
||||
|
||||
|
@ -454,17 +436,9 @@ public class RSProcedureDispatcher
|
|||
return destinationServer;
|
||||
}
|
||||
|
||||
protected void setClosed(final boolean closed) {
|
||||
this.closed = closed;
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
|
||||
public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
|
||||
return ProtobufUtil.buildCloseRegionRequest(serverName,
|
||||
getRegionInfo().getRegionName(), getDestinationServer());
|
||||
return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
|
||||
getDestinationServer(), procId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2228,9 +2228,11 @@ public class HRegionServer extends HasThread implements
|
|||
@Override
|
||||
public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
|
||||
HRegion r = context.getRegion();
|
||||
long openProcId = context.getOpenProcId();
|
||||
long masterSystemTime = context.getMasterSystemTime();
|
||||
rpcServices.checkOpen();
|
||||
LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
|
||||
LOG.info("Post open deploy tasks for {}, openProcId={}, masterSystemTime={}",
|
||||
r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
|
||||
// Do checks to see if we need to compact (references or too many files)
|
||||
for (HStore s : r.stores.values()) {
|
||||
if (s.hasReferences() || s.needsCompaction()) {
|
||||
|
@ -2247,7 +2249,7 @@ public class HRegionServer extends HasThread implements
|
|||
|
||||
// Notify master
|
||||
if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
|
||||
openSeqNum, masterSystemTime, r.getRegionInfo()))) {
|
||||
openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) {
|
||||
throw new IOException(
|
||||
"Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
|
||||
}
|
||||
|
@ -2263,6 +2265,7 @@ public class HRegionServer extends HasThread implements
|
|||
long openSeqNum = context.getOpenSeqNum();
|
||||
long masterSystemTime = context.getMasterSystemTime();
|
||||
RegionInfo[] hris = context.getHris();
|
||||
long[] procIds = context.getProcIds();
|
||||
|
||||
if (TEST_SKIP_REPORTING_TRANSITION) {
|
||||
// This is for testing only in case there is no master
|
||||
|
@ -2301,6 +2304,9 @@ public class HRegionServer extends HasThread implements
|
|||
for (RegionInfo hri: hris) {
|
||||
transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
|
||||
}
|
||||
for (long procId: procIds) {
|
||||
transition.addProcId(procId);
|
||||
}
|
||||
ReportRegionStateTransitionRequest request = builder.build();
|
||||
int tries = 0;
|
||||
long pauseTime = INIT_PAUSE_TIME_MS;
|
||||
|
|
|
@ -3714,8 +3714,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
|
||||
regionOpenInfo.getFavoredNodesList());
|
||||
}
|
||||
regionServer.executorService
|
||||
.submit(AssignRegionHandler.create(regionServer, regionInfo, tableDesc, masterSystemTime));
|
||||
regionServer.executorService.submit(AssignRegionHandler.create(regionServer, regionInfo,
|
||||
regionOpenInfo.getOpenProcId(), tableDesc, masterSystemTime));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3729,8 +3729,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
ServerName destination =
|
||||
request.hasDestinationServer() ? ProtobufUtil.toServerName(request.getDestinationServer())
|
||||
: null;
|
||||
regionServer.executorService
|
||||
.submit(UnassignRegionHandler.create(regionServer, encodedName, false, destination));
|
||||
regionServer.executorService.submit(UnassignRegionHandler.create(regionServer, encodedName,
|
||||
request.getCloseProcId(), false, destination));
|
||||
}
|
||||
|
||||
private void executeProcedures(RemoteProcedureRequest request) {
|
||||
|
|
|
@ -100,16 +100,23 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
|
|||
*/
|
||||
class PostOpenDeployContext {
|
||||
private final HRegion region;
|
||||
private final long openProcId;
|
||||
private final long masterSystemTime;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public PostOpenDeployContext(HRegion region, long masterSystemTime) {
|
||||
public PostOpenDeployContext(HRegion region, long openProcId, long masterSystemTime) {
|
||||
this.region = region;
|
||||
this.openProcId = openProcId;
|
||||
this.masterSystemTime = masterSystemTime;
|
||||
}
|
||||
|
||||
public HRegion getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
public long getOpenProcId() {
|
||||
return openProcId;
|
||||
}
|
||||
|
||||
public long getMasterSystemTime() {
|
||||
return masterSystemTime;
|
||||
}
|
||||
|
@ -125,28 +132,46 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
|
|||
private final TransitionCode code;
|
||||
private final long openSeqNum;
|
||||
private final long masterSystemTime;
|
||||
private final long[] procIds;
|
||||
private final RegionInfo[] hris;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public RegionStateTransitionContext(TransitionCode code, long openSeqNum, long masterSystemTime,
|
||||
RegionInfo... hris) {
|
||||
this.code = code;
|
||||
this.openSeqNum = openSeqNum;
|
||||
this.masterSystemTime = masterSystemTime;
|
||||
this.hris = hris;
|
||||
this.procIds = new long[hris.length];
|
||||
}
|
||||
|
||||
public RegionStateTransitionContext(TransitionCode code, long openSeqNum, long procId,
|
||||
long masterSystemTime, RegionInfo hri) {
|
||||
this.code = code;
|
||||
this.openSeqNum = openSeqNum;
|
||||
this.masterSystemTime = masterSystemTime;
|
||||
this.hris = new RegionInfo[] { hri };
|
||||
this.procIds = new long[] { procId };
|
||||
}
|
||||
|
||||
public TransitionCode getCode() {
|
||||
return code;
|
||||
}
|
||||
|
||||
public long getOpenSeqNum() {
|
||||
return openSeqNum;
|
||||
}
|
||||
|
||||
public long getMasterSystemTime() {
|
||||
return masterSystemTime;
|
||||
}
|
||||
|
||||
public RegionInfo[] getHris() {
|
||||
return hris;
|
||||
}
|
||||
|
||||
public long[] getProcIds() {
|
||||
return procIds;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.security.PrivilegedAction;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -30,7 +29,9 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
|
||||
/**
|
||||
|
@ -87,7 +88,7 @@ class SplitRequest implements Runnable {
|
|||
// hri_a and hri_b objects may not reflect the regions that will be created, those objects
|
||||
// are created just to pass the information to the reportRegionStateTransition().
|
||||
if (!server.reportRegionStateTransition(new RegionStateTransitionContext(
|
||||
TransitionCode.READY_TO_SPLIT, HConstants.NO_SEQNUM, -1, parent, hri_a, hri_b))) {
|
||||
TransitionCode.READY_TO_SPLIT, HConstants.NO_SEQNUM, -1, parent, hri_a, hri_b))) {
|
||||
LOG.error("Unable to ask master to split " + parent.getRegionNameAsString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,16 +51,19 @@ public class AssignRegionHandler extends EventHandler {
|
|||
|
||||
private final RegionInfo regionInfo;
|
||||
|
||||
private final long openProcId;
|
||||
|
||||
private final TableDescriptor tableDesc;
|
||||
|
||||
private final long masterSystemTime;
|
||||
|
||||
private final RetryCounter retryCounter;
|
||||
|
||||
public AssignRegionHandler(RegionServerServices server, RegionInfo regionInfo,
|
||||
public AssignRegionHandler(RegionServerServices server, RegionInfo regionInfo, long openProcId,
|
||||
@Nullable TableDescriptor tableDesc, long masterSystemTime, EventType eventType) {
|
||||
super(server, eventType);
|
||||
this.regionInfo = regionInfo;
|
||||
this.openProcId = openProcId;
|
||||
this.tableDesc = tableDesc;
|
||||
this.masterSystemTime = masterSystemTime;
|
||||
this.retryCounter = HandlerUtil.getRetryCounter();
|
||||
|
@ -76,7 +79,7 @@ public class AssignRegionHandler extends EventHandler {
|
|||
RegionServerServices rs = getServer();
|
||||
rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes(), Boolean.TRUE);
|
||||
if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.FAILED_OPEN,
|
||||
HConstants.NO_SEQNUM, masterSystemTime, regionInfo))) {
|
||||
HConstants.NO_SEQNUM, openProcId, masterSystemTime, regionInfo))) {
|
||||
throw new IOException(
|
||||
"Failed to report failed open to master: " + regionInfo.getRegionNameAsString());
|
||||
}
|
||||
|
@ -133,7 +136,7 @@ public class AssignRegionHandler extends EventHandler {
|
|||
cleanUpAndReportFailure(e);
|
||||
return;
|
||||
}
|
||||
rs.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime));
|
||||
rs.postOpenDeployTasks(new PostOpenDeployContext(region, openProcId, masterSystemTime));
|
||||
rs.addRegion(region);
|
||||
LOG.info("Opened {}", regionName);
|
||||
Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes());
|
||||
|
@ -156,7 +159,7 @@ public class AssignRegionHandler extends EventHandler {
|
|||
}
|
||||
|
||||
public static AssignRegionHandler create(RegionServerServices server, RegionInfo regionInfo,
|
||||
TableDescriptor tableDesc, long masterSystemTime) {
|
||||
long openProcId, TableDescriptor tableDesc, long masterSystemTime) {
|
||||
EventType eventType;
|
||||
if (regionInfo.isMetaRegion()) {
|
||||
eventType = EventType.M_RS_CLOSE_META;
|
||||
|
@ -166,6 +169,7 @@ public class AssignRegionHandler extends EventHandler {
|
|||
} else {
|
||||
eventType = EventType.M_RS_OPEN_REGION;
|
||||
}
|
||||
return new AssignRegionHandler(server, regionInfo, tableDesc, masterSystemTime, eventType);
|
||||
return new AssignRegionHandler(server, regionInfo, openProcId, tableDesc, masterSystemTime,
|
||||
eventType);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
|
||||
|
@ -123,7 +124,7 @@ public class CloseRegionHandler extends EventHandler {
|
|||
|
||||
this.rsServices.removeRegion(region, destination);
|
||||
rsServices.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.CLOSED,
|
||||
HConstants.NO_SEQNUM, -1, regionInfo));
|
||||
HConstants.NO_SEQNUM, Procedure.NO_PROC_ID, -1, regionInfo));
|
||||
|
||||
// Done! Region is closed on this RS
|
||||
LOG.debug("Closed " + region.getRegionInfo().getRegionNameAsString());
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
|
@ -156,15 +157,14 @@ public class OpenRegionHandler extends EventHandler {
|
|||
}
|
||||
}
|
||||
|
||||
private void doCleanUpOnFailedOpen(HRegion region)
|
||||
throws IOException {
|
||||
private void doCleanUpOnFailedOpen(HRegion region) throws IOException {
|
||||
try {
|
||||
if (region != null) {
|
||||
cleanupFailedOpen(region);
|
||||
}
|
||||
} finally {
|
||||
rsServices.reportRegionStateTransition(new RegionStateTransitionContext(
|
||||
TransitionCode.FAILED_OPEN, HConstants.NO_SEQNUM, -1, regionInfo));
|
||||
TransitionCode.FAILED_OPEN, HConstants.NO_SEQNUM, Procedure.NO_PROC_ID, -1, regionInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -248,19 +248,19 @@ public class OpenRegionHandler extends EventHandler {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
this.services.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime));
|
||||
this.services.postOpenDeployTasks(
|
||||
new PostOpenDeployContext(region, Procedure.NO_PROC_ID, masterSystemTime));
|
||||
} catch (Throwable e) {
|
||||
String msg = "Exception running postOpenDeployTasks; region=" +
|
||||
this.region.getRegionInfo().getEncodedName();
|
||||
this.exception = e;
|
||||
if (e instanceof IOException
|
||||
&& isRegionStillOpening(region.getRegionInfo(), services)) {
|
||||
if (e instanceof IOException && isRegionStillOpening(region.getRegionInfo(), services)) {
|
||||
server.abort(msg, e);
|
||||
} else {
|
||||
LOG.warn(msg, e);
|
||||
}
|
||||
}
|
||||
// We're done. Set flag then wake up anyone waiting on thread to complete.
|
||||
// We're done. Set flag then wake up anyone waiting on thread to complete.
|
||||
this.signaller.set(true);
|
||||
synchronized (this.signaller) {
|
||||
this.signaller.notify();
|
||||
|
|
|
@ -49,18 +49,22 @@ public class UnassignRegionHandler extends EventHandler {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(UnassignRegionHandler.class);
|
||||
|
||||
private final String encodedName;
|
||||
|
||||
private final long closeProcId;
|
||||
// If true, the hosting server is aborting. Region close process is different
|
||||
// when we are aborting.
|
||||
// TODO: not used yet, we still use the old CloseRegionHandler when aborting
|
||||
private final boolean abort;
|
||||
|
||||
private final ServerName destination;
|
||||
|
||||
private final RetryCounter retryCounter;
|
||||
|
||||
public UnassignRegionHandler(RegionServerServices server, String encodedName, boolean abort,
|
||||
@Nullable ServerName destination, EventType eventType) {
|
||||
public UnassignRegionHandler(RegionServerServices server, String encodedName, long closeProcId,
|
||||
boolean abort, @Nullable ServerName destination, EventType eventType) {
|
||||
super(server, eventType);
|
||||
this.encodedName = encodedName;
|
||||
this.closeProcId = closeProcId;
|
||||
this.abort = abort;
|
||||
this.destination = destination;
|
||||
this.retryCounter = HandlerUtil.getRetryCounter();
|
||||
|
@ -117,7 +121,7 @@ public class UnassignRegionHandler extends EventHandler {
|
|||
}
|
||||
rs.removeRegion(region, destination);
|
||||
if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.CLOSED,
|
||||
HConstants.NO_SEQNUM, -1, region.getRegionInfo()))) {
|
||||
HConstants.NO_SEQNUM, closeProcId, -1, region.getRegionInfo()))) {
|
||||
throw new IOException("Failed to report close to master: " + regionName);
|
||||
}
|
||||
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
|
||||
|
@ -131,13 +135,14 @@ public class UnassignRegionHandler extends EventHandler {
|
|||
}
|
||||
|
||||
public static UnassignRegionHandler create(RegionServerServices server, String encodedName,
|
||||
boolean abort, @Nullable ServerName destination) {
|
||||
long closeProcId, boolean abort, @Nullable ServerName destination) {
|
||||
// Just try our best to determine whether it is for closing meta. It is not the end of the world
|
||||
// if we put the handler into a wrong executor.
|
||||
Region region = server.getRegion(encodedName);
|
||||
EventType eventType =
|
||||
region != null && region.getRegionInfo().isMetaRegion() ? EventType.M_RS_CLOSE_META
|
||||
: EventType.M_RS_CLOSE_REGION;
|
||||
return new UnassignRegionHandler(server, encodedName, abort, destination, eventType);
|
||||
return new UnassignRegionHandler(server, encodedName, closeProcId, abort, destination,
|
||||
eventType);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -400,6 +400,13 @@ public abstract class TestAssignmentManagerBase {
|
|||
if (retries == timeoutTimes) {
|
||||
LOG.info("Mark server=" + server + " as dead. retries=" + retries);
|
||||
master.getServerManager().moveFromOnlineToDeadServers(server);
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Sending in CRASH of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
throw new SocketTimeoutException("simulate socket timeout");
|
||||
} else {
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||
|
@ -167,7 +168,7 @@ public class TestCloseRegionWhileRSCrash {
|
|||
HRegionServer dstRs = UTIL.getOtherRegionServer(srcRs);
|
||||
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||
long dummyProcId = procExec.submitProcedure(new DummyServerProcedure(srcRs.getServerName()));
|
||||
procExec.submitProcedure(new DummyServerProcedure(srcRs.getServerName()));
|
||||
ARRIVE.await();
|
||||
UTIL.getMiniHBaseCluster().killRegionServer(srcRs.getServerName());
|
||||
UTIL.waitFor(30000,
|
||||
|
@ -185,13 +186,12 @@ public class TestCloseRegionWhileRSCrash {
|
|||
30000);
|
||||
// wait until the timeout value increase three times
|
||||
ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TransitRegionStateProcedure.class, 3);
|
||||
// let's close the connection to make sure that the SCP can not update meta successfully
|
||||
UTIL.getMiniHBaseCluster().getMaster().getConnection().close();
|
||||
// close connection to make sure that we can not finish the TRSP
|
||||
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
|
||||
master.getConnection().close();
|
||||
RESUME.countDown();
|
||||
UTIL.waitFor(30000, () -> procExec.isFinished(dummyProcId));
|
||||
Thread.sleep(2000);
|
||||
// here we restart
|
||||
UTIL.getMiniHBaseCluster().stopMaster(0).join();
|
||||
UTIL.waitFor(30000, () -> !master.isAlive());
|
||||
// here we start a new master
|
||||
UTIL.getMiniHBaseCluster().startMaster();
|
||||
t.join();
|
||||
// Make sure that the region is online, it may not on the original target server, as we will set
|
||||
|
|
|
@ -0,0 +1,209 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||
|
||||
/**
|
||||
* See HBASE-22060 and HBASE-22074 for more details.
|
||||
*/
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestOpenRegionProcedureHang {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestOpenRegionProcedureHang.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestOpenRegionProcedureHang.class);
|
||||
|
||||
private static CountDownLatch ARRIVE;
|
||||
private static CountDownLatch RESUME;
|
||||
|
||||
private static CountDownLatch FINISH;
|
||||
|
||||
private static CountDownLatch ABORT;
|
||||
|
||||
private static final class AssignmentManagerForTest extends AssignmentManager {
|
||||
|
||||
public AssignmentManagerForTest(MasterServices master) {
|
||||
super(master);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReportRegionStateTransitionResponse reportRegionStateTransition(
|
||||
ReportRegionStateTransitionRequest req) throws PleaseHoldException {
|
||||
RegionStateTransition transition = req.getTransition(0);
|
||||
if (transition.getTransitionCode() == TransitionCode.OPENED &&
|
||||
ProtobufUtil.toTableName(transition.getRegionInfo(0).getTableName()).equals(NAME) &&
|
||||
ARRIVE != null) {
|
||||
ARRIVE.countDown();
|
||||
try {
|
||||
RESUME.await();
|
||||
RESUME = null;
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
try {
|
||||
return super.reportRegionStateTransition(req);
|
||||
} finally {
|
||||
FINISH.countDown();
|
||||
}
|
||||
} else {
|
||||
return super.reportRegionStateTransition(req);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static final class HMasterForTest extends HMaster {
|
||||
|
||||
public HMasterForTest(Configuration conf) throws IOException, KeeperException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AssignmentManager createAssignmentManager(MasterServices master) {
|
||||
return new AssignmentManagerForTest(master);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(String reason, Throwable cause) {
|
||||
// hang here so we can finish the reportRegionStateTransition call, which is the most
|
||||
// important part to reproduce the bug
|
||||
if (ABORT != null) {
|
||||
try {
|
||||
ABORT.await();
|
||||
ABORT = null;
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
super.abort(reason, cause);
|
||||
}
|
||||
}
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName NAME = TableName.valueOf("Open");
|
||||
|
||||
private static byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
|
||||
|
||||
// make sure we do not timeout when caling reportRegionStateTransition
|
||||
conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10 * 60 * 1000);
|
||||
conf.setInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 10 * 60 * 1000);
|
||||
UTIL
|
||||
.startMiniCluster(StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).build());
|
||||
UTIL.createTable(NAME, CF);
|
||||
UTIL.waitTableAvailable(NAME);
|
||||
UTIL.getAdmin().balancerSwitch(false, true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws InterruptedException, KeeperException, IOException {
|
||||
RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
|
||||
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
|
||||
|
||||
HRegionServer rs1 = UTIL.getRSForFirstRegionInTable(NAME);
|
||||
HRegionServer rs2 = UTIL.getOtherRegionServer(rs1);
|
||||
|
||||
ARRIVE = new CountDownLatch(1);
|
||||
RESUME = new CountDownLatch(1);
|
||||
FINISH = new CountDownLatch(1);
|
||||
ABORT = new CountDownLatch(1);
|
||||
am.moveAsync(new RegionPlan(region, rs1.getServerName(), rs2.getServerName()));
|
||||
|
||||
ARRIVE.await();
|
||||
ARRIVE = null;
|
||||
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
|
||||
master.getZooKeeper().close();
|
||||
UTIL.waitFor(30000, () -> {
|
||||
for (MasterThread mt : UTIL.getMiniHBaseCluster().getMasterThreads()) {
|
||||
if (mt.getMaster() != master && mt.getMaster().isActiveMaster()) {
|
||||
return mt.getMaster().isInitialized();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
});
|
||||
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||
UTIL.waitFor(30000,
|
||||
() -> procExec.getProcedures().stream().filter(p -> p instanceof OpenRegionProcedure)
|
||||
.map(p -> (OpenRegionProcedure) p).anyMatch(p -> p.region.getTable().equals(NAME)));
|
||||
OpenRegionProcedure proc = procExec.getProcedures().stream()
|
||||
.filter(p -> p instanceof OpenRegionProcedure).map(p -> (OpenRegionProcedure) p)
|
||||
.filter(p -> p.region.getTable().equals(NAME)).findFirst().get();
|
||||
// wait a bit to let the OpenRegionProcedure send out the request
|
||||
Thread.sleep(2000);
|
||||
RESUME.countDown();
|
||||
if (!FINISH.await(15, TimeUnit.SECONDS)) {
|
||||
LOG.info("Wait reportRegionStateTransition to finish timed out, this is possible if" +
|
||||
" we update the procedure store, as the WALProcedureStore" +
|
||||
" will retry forever to roll the writer if it is not closed");
|
||||
}
|
||||
FINISH = null;
|
||||
// if the reportRegionTransition is finished, wait a bit to let it return the data to RS
|
||||
Thread.sleep(2000);
|
||||
ABORT.countDown();
|
||||
|
||||
UTIL.waitFor(30000, () -> procExec.isFinished(proc.getProcId()));
|
||||
UTIL.waitFor(30000, () -> procExec.isFinished(proc.getParentProcId()));
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.NavigableMap;
|
||||
|
@ -31,7 +32,6 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
|
|||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
|
||||
import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure;
|
||||
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
||||
|
@ -62,6 +63,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
|
@ -134,19 +136,21 @@ public class TestServerRemoteProcedure {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRegionOpenProcedureIsNotHandledByDisPatcher() throws Exception {
|
||||
public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception {
|
||||
TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher");
|
||||
RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1))
|
||||
.setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
|
||||
master.getMasterProcedureExecutor().getEnvironment().getAssignmentManager().getRegionStates()
|
||||
.getOrCreateRegionStateNode(hri);
|
||||
.setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
|
||||
MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
|
||||
env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri);
|
||||
TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null);
|
||||
ServerName worker = master.getServerManager().getOnlineServersList().get(0);
|
||||
OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(hri, worker);
|
||||
OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker);
|
||||
Future<byte[]> future = submitProcedure(openRegionProcedure);
|
||||
Thread.sleep(2000);
|
||||
rsDispatcher.removeNode(worker);
|
||||
try {
|
||||
future.get(2000, TimeUnit.MILLISECONDS);
|
||||
fail();
|
||||
} catch (TimeoutException e) {
|
||||
LOG.info("timeout is expected");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue