diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java index cc3b76537bd..40612e9b202 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java @@ -103,6 +103,13 @@ public class TableState { return isInStates(State.ENABLED); } + /** + * @return True if table is {@link State#ENABLING}. + */ + public boolean isEnabling() { + return isInStates(State.ENABLING); + } + /** * @return True if {@link State#ENABLED} or {@link State#ENABLING} */ @@ -117,6 +124,13 @@ public class TableState { return isInStates(State.DISABLED); } + /** + * @return True if table is disabling. + */ + public boolean isDisabling() { + return isInStates(State.DISABLING); + } + /** * @return True if {@link State#DISABLED} or {@link State#DISABLED} */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index f5caff74089..685a73e1e7f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -107,7 +108,7 @@ public class DisableTableProcedure break; case DISABLE_TABLE_MARK_REGIONS_OFFLINE: addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName)); - setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE); + setNextState(DisableTableState.DISABLE_TABLE_ADD_REPLICATION_BARRIER); break; case DISABLE_TABLE_ADD_REPLICATION_BARRIER: if (env.getMasterServices().getTableDescriptors().get(tableName) @@ -119,7 +120,8 @@ public class DisableTableProcedure .getRegionsOfTable(tableName)) { long maxSequenceId = WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); - mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, maxSequenceId, + long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM; + mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum, EnvironmentEdgeManager.currentTime())); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 8bedeffac33..0b9efcefa78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -18,17 +18,16 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException; -import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; @@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +54,9 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedurelastPushedSeqId pair into the map passed in, if the map is // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller // should not forget to check whether the map is empty at last, if not you should call @@ -192,26 +229,13 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure name2Barrier : MetaTableAccessor - .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { - addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, - queueStorage); - } - } else { - for (RegionInfo region : regionStates.getRegionsOfTable(tableName, true)) { - long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); - addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage); - } + for (Pair name2Barrier : MetaTableAccessor + .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { + addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, + queueStorage); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java index 317c120a271..d3d6cbe837c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.replication; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.Collections; import org.apache.hadoop.fs.Path; @@ -26,6 +28,8 @@ import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.replication.regionserver.Replication; @@ -192,4 +196,74 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { waitUntilReplicationDone(100); checkOrder(100); } + + @Test + public void testDisablingTable() throws Exception { + TableName tableName = createTable(); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + UTIL.getAdmin().disableTable(tableName); + rollAllWALs(); + TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); + tsm.setTableState(tableName, TableState.State.DISABLING); + Thread t = new Thread(() -> { + try { + addPeer(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + t.start(); + Thread.sleep(5000); + // we will wait on the disabling table so the thread should still be alive. + assertTrue(t.isAlive()); + tsm.setTableState(tableName, TableState.State.DISABLED); + t.join(); + UTIL.getAdmin().enableTable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(100); + checkOrder(100); + } + + @Test + public void testEnablingTable() throws Exception { + TableName tableName = createTable(); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); + moveRegionAndArchiveOldWals(region, rs); + TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); + tsm.setTableState(tableName, TableState.State.ENABLING); + Thread t = new Thread(() -> { + try { + addPeer(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + t.start(); + Thread.sleep(5000); + // we will wait on the disabling table so the thread should still be alive. + assertTrue(t.isAlive()); + tsm.setTableState(tableName, TableState.State.ENABLED); + t.join(); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(100); + checkOrder(100); + } }