HBASE-20752 Make sure the regions are truly reopened after ReopenTableRegionsProcedure
This commit is contained in:
parent
bc9f9ae080
commit
7b716c964b
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -3130,6 +3131,22 @@ public final class ProtobufUtil {
|
|||
return rib.build();
|
||||
}
|
||||
|
||||
public static HBaseProtos.RegionLocation toRegionLocation(HRegionLocation loc) {
|
||||
HBaseProtos.RegionLocation.Builder builder = HBaseProtos.RegionLocation.newBuilder();
|
||||
builder.setRegionInfo(toRegionInfo(loc.getRegion()));
|
||||
if (loc.getServerName() != null) {
|
||||
builder.setServerName(toServerName(loc.getServerName()));
|
||||
}
|
||||
builder.setSeqNum(loc.getSeqNum());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static HRegionLocation toRegionLocation(HBaseProtos.RegionLocation proto) {
|
||||
org.apache.hadoop.hbase.client.RegionInfo regionInfo = toRegionInfo(proto.getRegionInfo());
|
||||
ServerName serverName = proto.hasServerName() ? toServerName(proto.getServerName()) : null;
|
||||
return new HRegionLocation(regionInfo, serverName, proto.getSeqNum());
|
||||
}
|
||||
|
||||
public static List<SnapshotDescription> toSnapshotDescriptionList(
|
||||
GetCompletedSnapshotsResponse response, Pattern pattern) {
|
||||
return response.getSnapshotsList().stream().map(ProtobufUtil::createSnapshotDesc)
|
||||
|
|
|
@ -268,3 +268,9 @@ message FlushedRegionSequenceId {
|
|||
message FlushedSequenceId {
|
||||
repeated FlushedRegionSequenceId regionSequenceId = 1;
|
||||
}
|
||||
|
||||
message RegionLocation {
|
||||
required RegionInfo region_info = 1;
|
||||
optional ServerName server_name = 2;
|
||||
required int64 seq_num = 3;
|
||||
}
|
|
@ -441,11 +441,14 @@ message DisablePeerStateData {
|
|||
}
|
||||
|
||||
enum ReopenTableRegionsState {
|
||||
REOPEN_TABLE_REGIONS_REOPEN_ALL_REGIONS = 1;
|
||||
REOPEN_TABLE_REGIONS_GET_REGIONS = 1;
|
||||
REOPEN_TABLE_REGIONS_REOPEN_REGIONS = 2;
|
||||
REOPEN_TABLE_REGIONS_CONFIRM_REOPENED = 3;
|
||||
}
|
||||
|
||||
message ReopenTableRegionsStateData {
|
||||
required TableName table_name = 1;
|
||||
repeated RegionLocation region = 2;
|
||||
}
|
||||
|
||||
enum InitMetaState {
|
||||
|
|
|
@ -695,18 +695,6 @@ public class AssignmentManager implements ServerListener {
|
|||
return procs.toArray(UNASSIGN_PROCEDURE_ARRAY_TYPE);
|
||||
}
|
||||
|
||||
public MoveRegionProcedure[] createReopenProcedures(final Collection<RegionInfo> regionInfo)
|
||||
throws IOException {
|
||||
final MoveRegionProcedure[] procs = new MoveRegionProcedure[regionInfo.size()];
|
||||
int index = 0;
|
||||
for (RegionInfo hri: regionInfo) {
|
||||
final ServerName serverName = regionStates.getRegionServerOfRegion(hri);
|
||||
final RegionPlan plan = new RegionPlan(hri, serverName, serverName);
|
||||
procs[index++] = createMoveRegionProcedure(plan);
|
||||
}
|
||||
return procs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by things like DisableTableProcedure to get a list of UnassignProcedure
|
||||
* to unassign the regions of the table.
|
||||
|
@ -745,22 +733,21 @@ public class AssignmentManager implements ServerListener {
|
|||
return proc;
|
||||
}
|
||||
|
||||
public MoveRegionProcedure createMoveRegionProcedure(final RegionPlan plan)
|
||||
throws HBaseIOException {
|
||||
private MoveRegionProcedure createMoveRegionProcedure(RegionPlan plan) throws HBaseIOException {
|
||||
if (plan.getRegionInfo().getTable().isSystemTable()) {
|
||||
List<ServerName> exclude = getExcludedServersForSystemTable();
|
||||
if (plan.getDestination() != null && exclude.contains(plan.getDestination())) {
|
||||
try {
|
||||
LOG.info("Can not move " + plan.getRegionInfo() + " to " + plan.getDestination()
|
||||
+ " because the server is not with highest version");
|
||||
LOG.info("Can not move " + plan.getRegionInfo() + " to " + plan.getDestination() +
|
||||
" because the server is not with highest version");
|
||||
plan.setDestination(getBalancer().randomAssignment(plan.getRegionInfo(),
|
||||
this.master.getServerManager().createDestinationServersList(exclude)));
|
||||
this.master.getServerManager().createDestinationServersList(exclude)));
|
||||
} catch (HBaseIOException e) {
|
||||
LOG.warn(e.toString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return new MoveRegionProcedure(getProcedureEnvironment(), plan);
|
||||
return new MoveRegionProcedure(getProcedureEnvironment(), plan, true);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -56,11 +56,14 @@ public class MoveRegionProcedure extends AbstractStateMachineRegionProcedure<Mov
|
|||
}
|
||||
|
||||
/**
|
||||
* @throws IOException If the cluster is offline or master is stopping or if table is disabled
|
||||
* or non-existent.
|
||||
* @param check whether we should do some checks in the constructor. We will skip the checks if we
|
||||
* are reopening a region as this may fail the whole procedure and cause stuck. We will
|
||||
* do the check later when actually executing the procedure so not a big problem.
|
||||
* @throws IOException If the cluster is offline or master is stopping or if table is disabled or
|
||||
* non-existent.
|
||||
*/
|
||||
public MoveRegionProcedure(final MasterProcedureEnv env, final RegionPlan plan)
|
||||
throws HBaseIOException {
|
||||
public MoveRegionProcedure(MasterProcedureEnv env, RegionPlan plan, boolean check)
|
||||
throws HBaseIOException {
|
||||
super(env, plan.getRegionInfo());
|
||||
this.plan = plan;
|
||||
preflightChecks(env, true);
|
||||
|
@ -70,9 +73,7 @@ public class MoveRegionProcedure extends AbstractStateMachineRegionProcedure<Mov
|
|||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final MoveRegionState state)
|
||||
throws InterruptedException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
LOG.trace("{} execute state={}", this, state);
|
||||
switch (state) {
|
||||
case MOVE_REGION_PREPARE:
|
||||
// Check context again and that region is online; do it here after we have lock on region.
|
||||
|
|
|
@ -116,11 +116,13 @@ public class RegionStateStore {
|
|||
|
||||
final ServerName lastHost = hrl.getServerName();
|
||||
final ServerName regionLocation = getRegionServer(result, replicaId);
|
||||
final long openSeqNum = -1;
|
||||
final long openSeqNum = hrl.getSeqNum();
|
||||
|
||||
// TODO: move under trace, now is visible for debugging
|
||||
LOG.info("Load hbase:meta entry region={}, regionState={}, lastHost={}, " +
|
||||
"regionLocation={}", regionInfo.getEncodedName(), state, lastHost, regionLocation);
|
||||
LOG.info(
|
||||
"Load hbase:meta entry region={}, regionState={}, lastHost={}, " +
|
||||
"regionLocation={}, openSeqNum={}",
|
||||
regionInfo.getEncodedName(), state, lastHost, regionLocation, openSeqNum);
|
||||
visitor.visitRegionState(result, regionInfo, state, regionLocation, lastHost, openSeqNum);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,8 +35,9 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
|
@ -570,18 +572,6 @@ public class RegionStates {
|
|||
return !getTableRegionStates(tableName).isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns regions for a table which are open or about to be open (OPEN or OPENING)
|
||||
*/
|
||||
public List<RegionInfo> getOpenRegionsOfTable(final TableName table) {
|
||||
// We want to get regions which are already open on the cluster or are about to be open.
|
||||
// The use-case is for identifying regions which need to be re-opened to ensure they see some
|
||||
// new configuration. Regions in OPENING now are presently being opened by a RS, so we can
|
||||
// assume that they will imminently be OPEN but may not see our configuration change
|
||||
return getRegionsOfTable(
|
||||
table, (state) -> state.isInState(State.OPEN) || state.isInState(State.OPENING));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return online regions of table; does not include OFFLINE or SPLITTING regions.
|
||||
*/
|
||||
|
@ -589,28 +579,109 @@ public class RegionStates {
|
|||
return getRegionsOfTable(table, false);
|
||||
}
|
||||
|
||||
private HRegionLocation createRegionForReopen(RegionStateNode node) {
|
||||
synchronized (node) {
|
||||
if (!include(node, false)) {
|
||||
return null;
|
||||
}
|
||||
if (node.isInState(State.OPEN)) {
|
||||
return new HRegionLocation(node.getRegionInfo(), node.getRegionLocation(),
|
||||
node.getOpenSeqNum());
|
||||
} else if (node.isInState(State.OPENING)) {
|
||||
return new HRegionLocation(node.getRegionInfo(), node.getRegionLocation(), -1);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the regions to be reopened when modifying a table.
|
||||
* <p/>
|
||||
* Notice that the {@code openSeqNum} in the returned HRegionLocation is also used to indicate the
|
||||
* state of this region, positive means the region is in {@link State#OPEN}, -1 means
|
||||
* {@link State#OPENING}. And for regions in other states we do not need reopen them.
|
||||
*/
|
||||
public List<HRegionLocation> getRegionsOfTableForReopen(TableName tableName) {
|
||||
return getTableRegionStateNodes(tableName).stream().map(this::createRegionForReopen)
|
||||
.filter(r -> r != null).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the region has been reopened. The meaning of the {@link HRegionLocation} is the
|
||||
* same with {@link #getRegionsOfTableForReopen(TableName)}.
|
||||
* <p/>
|
||||
* For a region which is in {@link State#OPEN} before, if the region state is changed or the open
|
||||
* seq num is changed, we can confirm that it has been reopened.
|
||||
* <p/>
|
||||
* For a region which is in {@link State#OPENING} before, usually it will be in {@link State#OPEN}
|
||||
* now and we will schedule a MRP to reopen it. But there are several exceptions:
|
||||
* <ul>
|
||||
* <li>The region is in state other than {@link State#OPEN} or {@link State#OPENING}.</li>
|
||||
* <li>The location of the region has been changed</li>
|
||||
* </ul>
|
||||
* Of course the region could still be in {@link State#OPENING} state and still on the same
|
||||
* server, then here we will still return a {@link HRegionLocation} for it, just like
|
||||
* {@link #getRegionsOfTableForReopen(TableName)}.
|
||||
* @param oldLoc the previous state/location of this region
|
||||
* @return null if the region has been reopened, otherwise a new {@link HRegionLocation} which
|
||||
* means we still need to reopen the region.
|
||||
* @see #getRegionsOfTableForReopen(TableName)
|
||||
*/
|
||||
public HRegionLocation checkReopened(HRegionLocation oldLoc) {
|
||||
RegionStateNode node = getRegionStateNode(oldLoc.getRegion());
|
||||
synchronized (node) {
|
||||
if (oldLoc.getSeqNum() >= 0) {
|
||||
// in OPEN state before
|
||||
if (node.isInState(State.OPEN)) {
|
||||
if (node.getOpenSeqNum() > oldLoc.getSeqNum()) {
|
||||
// normal case, the region has been reopened
|
||||
return null;
|
||||
} else {
|
||||
// the open seq num does not change, need to reopen again
|
||||
return new HRegionLocation(node.getRegionInfo(), node.getRegionLocation(),
|
||||
node.getOpenSeqNum());
|
||||
}
|
||||
} else {
|
||||
// the state has been changed so we can make sure that the region has been reopened(not
|
||||
// finished maybe, but not a problem).
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
// in OPENING state before
|
||||
if (!node.isInState(State.OPEN, State.OPENING)) {
|
||||
// not in OPEN or OPENING state, then we can make sure that the region has been
|
||||
// reopened(not finished maybe, but not a problem)
|
||||
return null;
|
||||
} else {
|
||||
if (!node.getRegionLocation().equals(oldLoc.getServerName())) {
|
||||
// the region has been moved, so we can make sure that the region has been reopened.
|
||||
return null;
|
||||
}
|
||||
// normal case, we are still in OPENING state, or the reopen has been opened and the state
|
||||
// is changed to OPEN.
|
||||
long openSeqNum = node.isInState(State.OPEN) ? node.getOpenSeqNum() : -1;
|
||||
return new HRegionLocation(node.getRegionInfo(), node.getRegionLocation(), openSeqNum);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return online regions of table; does not include OFFLINE or SPLITTING regions.
|
||||
*/
|
||||
public List<RegionInfo> getRegionsOfTable(final TableName table, boolean offline) {
|
||||
return getRegionsOfTable(table, (state) -> include(state, offline));
|
||||
public List<RegionInfo> getRegionsOfTable(TableName table, boolean offline) {
|
||||
return getRegionsOfTable(table, state -> include(state, offline));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return the regions of the table; does not include OFFLINE unless you set
|
||||
* <code>offline</code> to true. Does not include regions that are in the
|
||||
* {@link State#SPLIT} state.
|
||||
* <code>offline</code> to true. Does not include regions that are in the
|
||||
* {@link State#SPLIT} state.
|
||||
*/
|
||||
public List<RegionInfo> getRegionsOfTable(
|
||||
final TableName table, Predicate<RegionStateNode> filter) {
|
||||
final ArrayList<RegionStateNode> nodes = getTableRegionStateNodes(table);
|
||||
final ArrayList<RegionInfo> hris = new ArrayList<RegionInfo>(nodes.size());
|
||||
for (RegionStateNode node: nodes) {
|
||||
if (filter.test(node)) {
|
||||
hris.add(node.getRegionInfo());
|
||||
}
|
||||
}
|
||||
return hris;
|
||||
private List<RegionInfo> getRegionsOfTable(TableName table, Predicate<RegionStateNode> filter) {
|
||||
return getTableRegionStateNodes(table).stream().filter(filter).map(n -> n.getRegionInfo())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.io.IOException;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -43,6 +42,7 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyTableState;
|
||||
|
@ -56,8 +56,6 @@ public class ModifyTableProcedure
|
|||
private TableDescriptor modifiedTableDescriptor;
|
||||
private boolean deleteColumnFamilyInModify;
|
||||
|
||||
private Boolean traceEnabled = null;
|
||||
|
||||
public ModifyTableProcedure() {
|
||||
super();
|
||||
initilize();
|
||||
|
@ -79,62 +77,57 @@ public class ModifyTableProcedure
|
|||
|
||||
private void initilize() {
|
||||
this.unmodifiedTableDescriptor = null;
|
||||
this.traceEnabled = null;
|
||||
this.deleteColumnFamilyInModify = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final ModifyTableState state)
|
||||
throws InterruptedException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
||||
LOG.trace("{} execute state={}", this, state);
|
||||
try {
|
||||
switch (state) {
|
||||
case MODIFY_TABLE_PREPARE:
|
||||
prepareModify(env);
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_PRE_OPERATION);
|
||||
break;
|
||||
case MODIFY_TABLE_PRE_OPERATION:
|
||||
preModify(env, state);
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR);
|
||||
break;
|
||||
case MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR:
|
||||
updateTableDescriptor(env);
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_REMOVE_REPLICA_COLUMN);
|
||||
break;
|
||||
case MODIFY_TABLE_REMOVE_REPLICA_COLUMN:
|
||||
updateReplicaColumnsIfNeeded(env, unmodifiedTableDescriptor, modifiedTableDescriptor);
|
||||
if (deleteColumnFamilyInModify) {
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_DELETE_FS_LAYOUT);
|
||||
} else {
|
||||
case MODIFY_TABLE_PREPARE:
|
||||
prepareModify(env);
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_PRE_OPERATION);
|
||||
break;
|
||||
case MODIFY_TABLE_PRE_OPERATION:
|
||||
preModify(env, state);
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR);
|
||||
break;
|
||||
case MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR:
|
||||
updateTableDescriptor(env);
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_REMOVE_REPLICA_COLUMN);
|
||||
break;
|
||||
case MODIFY_TABLE_REMOVE_REPLICA_COLUMN:
|
||||
updateReplicaColumnsIfNeeded(env, unmodifiedTableDescriptor, modifiedTableDescriptor);
|
||||
if (deleteColumnFamilyInModify) {
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_DELETE_FS_LAYOUT);
|
||||
} else {
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_POST_OPERATION);
|
||||
}
|
||||
break;
|
||||
case MODIFY_TABLE_DELETE_FS_LAYOUT:
|
||||
deleteFromFs(env, unmodifiedTableDescriptor, modifiedTableDescriptor);
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_POST_OPERATION);
|
||||
}
|
||||
break;
|
||||
case MODIFY_TABLE_DELETE_FS_LAYOUT:
|
||||
deleteFromFs(env, unmodifiedTableDescriptor, modifiedTableDescriptor);
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_POST_OPERATION);
|
||||
break;
|
||||
case MODIFY_TABLE_POST_OPERATION:
|
||||
postModify(env, state);
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS);
|
||||
break;
|
||||
case MODIFY_TABLE_REOPEN_ALL_REGIONS:
|
||||
if (env.getAssignmentManager().isTableEnabled(getTableName())) {
|
||||
addChildProcedure(env.getAssignmentManager()
|
||||
.createReopenProcedures(getOpenRegionInfoList(env)));
|
||||
}
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
break;
|
||||
case MODIFY_TABLE_POST_OPERATION:
|
||||
postModify(env, state);
|
||||
setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS);
|
||||
break;
|
||||
case MODIFY_TABLE_REOPEN_ALL_REGIONS:
|
||||
if (env.getAssignmentManager().isTableEnabled(getTableName())) {
|
||||
addChildProcedure(new ReopenTableRegionsProcedure(getTableName()));
|
||||
}
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (isRollbackSupported(state)) {
|
||||
setFailure("master-modify-table", e);
|
||||
} else {
|
||||
LOG.warn("Retriable error trying to modify table=" + getTableName() +
|
||||
" (in state=" + state + ")", e);
|
||||
LOG.warn("Retriable error trying to modify table={} (in state={})", getTableName(), state,
|
||||
e);
|
||||
}
|
||||
}
|
||||
return Flow.HAS_MORE_STATE;
|
||||
|
@ -172,7 +165,7 @@ public class ModifyTableProcedure
|
|||
|
||||
@Override
|
||||
protected ModifyTableState getState(final int stateId) {
|
||||
return ModifyTableState.valueOf(stateId);
|
||||
return ModifyTableState.forNumber(stateId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -293,22 +286,6 @@ public class ModifyTableProcedure
|
|||
env.getMasterServices().getTableDescriptors().add(modifiedTableDescriptor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Undo the descriptor change (for rollback)
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
**/
|
||||
private void restoreTableDescriptor(final MasterProcedureEnv env) throws IOException {
|
||||
env.getMasterServices().getTableDescriptors().add(unmodifiedTableDescriptor);
|
||||
|
||||
// delete any new column families from the modifiedTableDescriptor.
|
||||
deleteFromFs(env, modifiedTableDescriptor, unmodifiedTableDescriptor);
|
||||
|
||||
// Make sure regions are opened after table descriptor is updated.
|
||||
//reOpenAllRegionsIfTableIsOnline(env);
|
||||
// TODO: NUKE ROLLBACK!!!!
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes from hdfs the families that are not longer present in the new table descriptor.
|
||||
* @param env MasterProcedureEnv
|
||||
|
@ -393,18 +370,6 @@ public class ModifyTableProcedure
|
|||
runCoprocessorAction(env, state);
|
||||
}
|
||||
|
||||
/**
|
||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
||||
* retrieve it.
|
||||
* @return traceEnabled whether the trace is enabled
|
||||
*/
|
||||
private Boolean isTraceEnabled() {
|
||||
if (traceEnabled == null) {
|
||||
traceEnabled = LOG.isTraceEnabled();
|
||||
}
|
||||
return traceEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Coprocessor Action.
|
||||
* @param env MasterProcedureEnv
|
||||
|
@ -438,13 +403,4 @@ public class ModifyTableProcedure
|
|||
private List<RegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
|
||||
return env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches all open or soon to be open Regions for a table. Cache the result of this method if
|
||||
* you need to use it multiple times. Be aware that it may change over in between calls to this
|
||||
* procedure.
|
||||
*/
|
||||
private List<RegionInfo> getOpenRegionInfoList(final MasterProcedureEnv env) throws IOException {
|
||||
return env.getAssignmentManager().getRegionStates().getOpenRegionsOfTable(getTableName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,14 @@
|
|||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
|
@ -31,8 +38,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
|
||||
|
||||
/**
|
||||
* Used for non table procedures to reopen the regions for a table. For example,
|
||||
* {@link org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure}.
|
||||
* Used for reopening the regions for a table.
|
||||
* <p/>
|
||||
* Currently we use {@link MoveRegionProcedure} to reopen regions.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReopenTableRegionsProcedure
|
||||
|
@ -42,6 +50,8 @@ public class ReopenTableRegionsProcedure
|
|||
|
||||
private TableName tableName;
|
||||
|
||||
private List<HRegionLocation> regions = Collections.emptyList();
|
||||
|
||||
public ReopenTableRegionsProcedure() {
|
||||
}
|
||||
|
||||
|
@ -59,19 +69,53 @@ public class ReopenTableRegionsProcedure
|
|||
return TableOperationType.REGION_EDIT;
|
||||
}
|
||||
|
||||
private MoveRegionProcedure createReopenProcedure(MasterProcedureEnv env, HRegionLocation loc) {
|
||||
try {
|
||||
return new MoveRegionProcedure(env,
|
||||
new RegionPlan(loc.getRegion(), loc.getServerName(), loc.getServerName()), false);
|
||||
} catch (HBaseIOException e) {
|
||||
// we skip the checks so this should not happen
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
switch (state) {
|
||||
case REOPEN_TABLE_REGIONS_REOPEN_ALL_REGIONS:
|
||||
try {
|
||||
addChildProcedure(env.getAssignmentManager().createReopenProcedures(
|
||||
env.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName)));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to schedule reopen procedures for {}", tableName, e);
|
||||
throw new ProcedureSuspendedException();
|
||||
case REOPEN_TABLE_REGIONS_GET_REGIONS:
|
||||
if (!env.getAssignmentManager().isTableEnabled(tableName)) {
|
||||
LOG.info("Table {} is disabled, give up reopening its regions");
|
||||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
return Flow.NO_MORE_STATE;
|
||||
regions =
|
||||
env.getAssignmentManager().getRegionStates().getRegionsOfTableForReopen(tableName);
|
||||
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
|
||||
addChildProcedure(regions.stream().filter(l -> l.getSeqNum() >= 0)
|
||||
.map(l -> createReopenProcedure(env, l)).toArray(MoveRegionProcedure[]::new));
|
||||
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED:
|
||||
regions = regions.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
|
||||
.filter(l -> l != null).collect(Collectors.toList());
|
||||
if (regions.isEmpty()) {
|
||||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
if (regions.stream().anyMatch(l -> l.getSeqNum() >= 0)) {
|
||||
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
}
|
||||
LOG.info("There are still {} region(s) which need to be reopened for table {} are in " +
|
||||
"OPENING state, try again later", regions.size(), tableName);
|
||||
// All the regions need to reopen are in OPENING state which means we can not schedule any
|
||||
// MRPs. Then sleep for one second, and yield the procedure to let other procedures run
|
||||
// first and hope next time we can get some regions in other state to make progress.
|
||||
// TODO: add a delay for ProcedureYieldException so that we do not need to sleep here which
|
||||
// blocks a procedure worker.
|
||||
Thread.sleep(1000);
|
||||
throw new ProcedureYieldException();
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
|
@ -95,20 +139,24 @@ public class ReopenTableRegionsProcedure
|
|||
|
||||
@Override
|
||||
protected ReopenTableRegionsState getInitialState() {
|
||||
return ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_ALL_REGIONS;
|
||||
return ReopenTableRegionsState.REOPEN_TABLE_REGIONS_GET_REGIONS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.serializeStateData(serializer);
|
||||
serializer.serialize(ReopenTableRegionsStateData.newBuilder()
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build());
|
||||
ReopenTableRegionsStateData.Builder builder = ReopenTableRegionsStateData.newBuilder()
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName));
|
||||
regions.stream().map(ProtobufUtil::toRegionLocation).forEachOrdered(builder::addRegion);
|
||||
serializer.serialize(builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.deserializeStateData(serializer);
|
||||
tableName = ProtobufUtil
|
||||
.toTableName(serializer.deserialize(ReopenTableRegionsStateData.class).getTableName());
|
||||
ReopenTableRegionsStateData data = serializer.deserialize(ReopenTableRegionsStateData.class);
|
||||
tableName = ProtobufUtil.toTableName(data.getTableName());
|
||||
regions = data.getRegionList().stream().map(ProtobufUtil::toRegionLocation)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue