HBASE-20367 Write a replication barrier for regions when disabling a table

This commit is contained in:
zhangduo 2018-04-10 22:14:02 +08:00
parent a2b9172771
commit 37e5b0b1b7
3 changed files with 58 additions and 39 deletions

View File

@ -1976,7 +1976,7 @@ public class MetaTableAccessor {
.setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build()); .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build());
} }
private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts) public static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts)
throws IOException { throws IOException {
Put put = new Put(regionInfo.getRegionName(), ts); Put put = new Put(regionInfo.getRegionName(), ts);
addReplicationBarrier(put, openSeqNum); addReplicationBarrier(put, openSeqNum);

View File

@ -171,6 +171,7 @@ enum DisableTableState {
DISABLE_TABLE_MARK_REGIONS_OFFLINE = 4; DISABLE_TABLE_MARK_REGIONS_OFFLINE = 4;
DISABLE_TABLE_SET_DISABLED_TABLE_STATE = 5; DISABLE_TABLE_SET_DISABLED_TABLE_STATE = 5;
DISABLE_TABLE_POST_OPERATION = 6; DISABLE_TABLE_POST_OPERATION = 6;
DISABLE_TABLE_ADD_REPLICATION_BARRIER = 7;
} }
message DisableTableStateData { message DisableTableStateData {

View File

@ -19,20 +19,25 @@
package org.apache.hadoop.hbase.master.procedure; package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.BufferedMutator;
import org.slf4j.Logger; import org.apache.hadoop.hbase.client.RegionInfo;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisableTableState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisableTableState;
@ -81,48 +86,61 @@ public class DisableTableProcedure
@Override @Override
protected Flow executeFromState(final MasterProcedureEnv env, final DisableTableState state) protected Flow executeFromState(final MasterProcedureEnv env, final DisableTableState state)
throws InterruptedException { throws InterruptedException {
if (isTraceEnabled()) { LOG.trace("{} execute state={}", this, state);
LOG.trace(this + " execute state=" + state);
}
try { try {
switch (state) { switch (state) {
case DISABLE_TABLE_PREPARE: case DISABLE_TABLE_PREPARE:
if (prepareDisable(env)) { if (prepareDisable(env)) {
setNextState(DisableTableState.DISABLE_TABLE_PRE_OPERATION); setNextState(DisableTableState.DISABLE_TABLE_PRE_OPERATION);
} else { } else {
assert isFailed() : "disable should have an exception here"; assert isFailed() : "disable should have an exception here";
return Flow.NO_MORE_STATE;
}
break;
case DISABLE_TABLE_PRE_OPERATION:
preDisable(env, state);
setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLING_TABLE_STATE);
break;
case DISABLE_TABLE_SET_DISABLING_TABLE_STATE:
setTableStateToDisabling(env, tableName);
setNextState(DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE);
break;
case DISABLE_TABLE_MARK_REGIONS_OFFLINE:
addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName));
setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE);
break;
case DISABLE_TABLE_ADD_REPLICATION_BARRIER:
if (env.getMasterServices().getTableDescriptors().get(tableName)
.hasGlobalReplicationScope()) {
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
try (BufferedMutator mutator = env.getMasterServices().getConnection()
.getBufferedMutator(TableName.META_TABLE_NAME)) {
for (RegionInfo region : env.getAssignmentManager().getRegionStates()
.getRegionsOfTable(tableName)) {
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, maxSequenceId,
EnvironmentEdgeManager.currentTime()));
}
}
}
setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE);
break;
case DISABLE_TABLE_SET_DISABLED_TABLE_STATE:
setTableStateToDisabled(env, tableName);
setNextState(DisableTableState.DISABLE_TABLE_POST_OPERATION);
break;
case DISABLE_TABLE_POST_OPERATION:
postDisable(env, state);
return Flow.NO_MORE_STATE; return Flow.NO_MORE_STATE;
} default:
break; throw new UnsupportedOperationException("Unhandled state=" + state);
case DISABLE_TABLE_PRE_OPERATION:
preDisable(env, state);
setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLING_TABLE_STATE);
break;
case DISABLE_TABLE_SET_DISABLING_TABLE_STATE:
setTableStateToDisabling(env, tableName);
setNextState(DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE);
break;
case DISABLE_TABLE_MARK_REGIONS_OFFLINE:
addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName));
setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE);
break;
case DISABLE_TABLE_SET_DISABLED_TABLE_STATE:
setTableStateToDisabled(env, tableName);
setNextState(DisableTableState.DISABLE_TABLE_POST_OPERATION);
break;
case DISABLE_TABLE_POST_OPERATION:
postDisable(env, state);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("Unhandled state=" + state);
} }
} catch (IOException e) { } catch (IOException e) {
if (isRollbackSupported(state)) { if (isRollbackSupported(state)) {
setFailure("master-disable-table", e); setFailure("master-disable-table", e);
} else { } else {
LOG.warn("Retriable error trying to disable table=" + tableName + LOG.warn("Retriable error trying to disable table={} (in state={})", tableName, state, e);
" (in state=" + state + ")", e);
} }
} }
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;