HBASE-20377 Deal with table in enabling and disabling state when modifying serial replication peer

This commit is contained in:
zhangduo 2018-04-13 16:21:03 +08:00
parent 826909a59c
commit 5a633adffe
4 changed files with 146 additions and 32 deletions

View File

@ -103,6 +103,13 @@ public class TableState {
return isInStates(State.ENABLED); return isInStates(State.ENABLED);
} }
/**
* @return True if table is {@link State#ENABLING}.
*/
public boolean isEnabling() {
return isInStates(State.ENABLING);
}
/** /**
* @return True if {@link State#ENABLED} or {@link State#ENABLING} * @return True if {@link State#ENABLED} or {@link State#ENABLING}
*/ */
@ -117,6 +124,13 @@ public class TableState {
return isInStates(State.DISABLED); return isInStates(State.DISABLED);
} }
/**
* @return True if table is disabling.
*/
public boolean isDisabling() {
return isInStates(State.DISABLING);
}
/** /**
* @return True if {@link State#DISABLED} or {@link State#DISABLED} * @return True if {@link State#DISABLED} or {@link State#DISABLED}
*/ */

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotEnabledException;
@ -107,7 +108,7 @@ public class DisableTableProcedure
break; break;
case DISABLE_TABLE_MARK_REGIONS_OFFLINE: case DISABLE_TABLE_MARK_REGIONS_OFFLINE:
addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName)); addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName));
setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE); setNextState(DisableTableState.DISABLE_TABLE_ADD_REPLICATION_BARRIER);
break; break;
case DISABLE_TABLE_ADD_REPLICATION_BARRIER: case DISABLE_TABLE_ADD_REPLICATION_BARRIER:
if (env.getMasterServices().getTableDescriptors().get(tableName) if (env.getMasterServices().getTableDescriptors().get(tableName)
@ -119,7 +120,8 @@ public class DisableTableProcedure
.getRegionsOfTable(tableName)) { .getRegionsOfTable(tableName)) {
long maxSequenceId = long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, maxSequenceId, long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM;
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum,
EnvironmentEdgeManager.currentTime())); EnvironmentEdgeManager.currentTime()));
} }
} }

View File

@ -18,17 +18,16 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException; import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -56,6 +54,9 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000; protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
// The sleep interval when waiting table to be enabled or disabled.
protected static final int SLEEP_INTERVAL_MS = 1000;
protected ModifyPeerProcedure() { protected ModifyPeerProcedure() {
} }
@ -126,6 +127,27 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
// If the table is in enabling state, we need to wait until it is enabled and then reopen all its
// regions.
private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
for (;;) {
try {
TableState state = tsm.getTableState(tn);
if (state.isEnabled()) {
return true;
}
if (!state.isEnabling()) {
return false;
}
Thread.sleep(SLEEP_INTERVAL_MS);
} catch (TableStateNotFoundException e) {
return false;
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
}
}
}
private void reopenRegions(MasterProcedureEnv env) throws IOException { private void reopenRegions(MasterProcedureEnv env) throws IOException {
ReplicationPeerConfig peerConfig = getNewPeerConfig(); ReplicationPeerConfig peerConfig = getNewPeerConfig();
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
@ -142,15 +164,10 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
ReplicationUtils.contains(oldPeerConfig, tn)) { ReplicationUtils.contains(oldPeerConfig, tn)) {
continue; continue;
} }
try { if (needReopen(tsm, tn)) {
if (!tsm.getTableState(tn).isEnabled()) { addChildProcedure(env.getAssignmentManager().createReopenProcedures(
continue; env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn)));
}
} catch (TableStateNotFoundException e) {
continue;
} }
addChildProcedure(env.getAssignmentManager().createReopenProcedures(
env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn)));
} }
} }
@ -183,6 +200,26 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
} }
} }
// If the table is currently disabling, then we need to wait until it is disabled.We will write
// replication barrier for a disabled table. And return whether we need to update the last pushed
// sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
// then we do not need to update last pushed sequence id for this table.
private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
throws IOException {
for (;;) {
try {
if (!tsm.getTableState(tn).isDisabling()) {
return true;
}
Thread.sleep(SLEEP_INTERVAL_MS);
} catch (TableStateNotFoundException e) {
return false;
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
}
}
}
// Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
// large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
// should not forget to check whether the map is empty at last, if not you should call // should not forget to check whether the map is empty at last, if not you should call
@ -192,26 +229,13 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
TableStateManager tsm = env.getMasterServices().getTableStateManager(); TableStateManager tsm = env.getMasterServices().getTableStateManager();
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
Connection conn = env.getMasterServices().getConnection(); Connection conn = env.getMasterServices().getConnection();
RegionStates regionStates = env.getAssignmentManager().getRegionStates(); if (!needSetLastPushedSequenceId(tsm, tableName)) {
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
boolean isTableEnabled;
try {
isTableEnabled = tsm.getTableState(tableName).isEnabled();
} catch (TableStateNotFoundException e) {
return; return;
} }
if (isTableEnabled) { for (Pair<String, Long> name2Barrier : MetaTableAccessor
for (Pair<String, Long> name2Barrier : MetaTableAccessor .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
.getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, queueStorage);
queueStorage);
}
} else {
for (RegionInfo region : regionStates.getRegionsOfTable(tableName, true)) {
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage);
}
} }
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -26,6 +28,8 @@ import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.Replication;
@ -192,4 +196,74 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
waitUntilReplicationDone(100); waitUntilReplicationDone(100);
checkOrder(100); checkOrder(100);
} }
@Test
public void testDisablingTable() throws Exception {
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
UTIL.getAdmin().disableTable(tableName);
rollAllWALs();
TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
tsm.setTableState(tableName, TableState.State.DISABLING);
Thread t = new Thread(() -> {
try {
addPeer(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
t.start();
Thread.sleep(5000);
// we will wait on the disabling table so the thread should still be alive.
assertTrue(t.isAlive());
tsm.setTableState(tableName, TableState.State.DISABLED);
t.join();
UTIL.getAdmin().enableTable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
waitUntilReplicationDone(100);
checkOrder(100);
}
@Test
public void testEnablingTable() throws Exception {
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
moveRegionAndArchiveOldWals(region, rs);
TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
tsm.setTableState(tableName, TableState.State.ENABLING);
Thread t = new Thread(() -> {
try {
addPeer(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
t.start();
Thread.sleep(5000);
// we will wait on the disabling table so the thread should still be alive.
assertTrue(t.isAlive());
tsm.setTableState(tableName, TableState.State.ENABLED);
t.join();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
waitUntilReplicationDone(100);
checkOrder(100);
}
} }