HBASE-20285 Delete all last pushed sequence ids when removing a peer or removing the serial flag for a peer

This commit is contained in:
zhangduo 2018-03-26 22:17:00 +08:00
parent 15c398f7d2
commit 056c3395d9
12 changed files with 226 additions and 42 deletions

View File

@ -421,3 +421,13 @@ message UpdatePeerConfigStateData {
required ReplicationPeer peer_config = 1; required ReplicationPeer peer_config = 1;
optional ReplicationPeer old_peer_config = 2; optional ReplicationPeer old_peer_config = 2;
} }
message RemovePeerStateData {
optional ReplicationPeer peer_config = 1;
}
message EnablePeerStateData {
}
message DisablePeerStateData {
}

View File

@ -86,6 +86,11 @@ public interface ReplicationQueueStorage {
*/ */
void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException; void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException;
/**
* Remove all the max sequence id record for the given peer.
* @param peerId peer id
*/
void removeLastSequenceIds(String peerId) throws ReplicationException;
/** /**
* Get the current position for a specific WAL in a given queue for a given regionserver. * Get the current position for a specific WAL in a given queue for a given regionserver.
* @param serverName the name of the regionserver * @param serverName the name of the regionserver

View File

@ -103,7 +103,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
*/ */
private final String hfileRefsZNode; private final String hfileRefsZNode;
private final String regionsZNode; @VisibleForTesting
final String regionsZNode;
public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) { public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf); super(zookeeper, conf);
@ -312,6 +313,40 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
} }
} }
@Override
public void removeLastSequenceIds(String peerId) throws ReplicationException {
String suffix = "-" + peerId;
try {
StringBuilder sb = new StringBuilder(regionsZNode);
int regionsZNodeLength = regionsZNode.length();
int levelOneLength = regionsZNodeLength + 3;
int levelTwoLength = levelOneLength + 3;
List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
// it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids
// yet, so we need an extra check here.
if (CollectionUtils.isEmpty(levelOneDirs)) {
return;
}
for (String levelOne : levelOneDirs) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne);
for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo);
for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
if (znode.endsWith(suffix)) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode);
ZKUtil.deleteNode(zookeeper, sb.toString());
sb.setLength(levelTwoLength);
}
}
sb.setLength(levelOneLength);
}
sb.setLength(regionsZNodeLength);
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e);
}
}
@Override @Override
public long getWALPosition(ServerName serverName, String queueId, String fileName) public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException { throws ReplicationException {

View File

@ -32,15 +32,17 @@ import java.util.SortedSet;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
@ -71,7 +73,7 @@ public class TestZKReplicationQueueStorage {
} }
@After @After
public void tearDownAfterTest() throws ReplicationException { public void tearDownAfterTest() throws ReplicationException, KeeperException, IOException {
for (ServerName serverName : STORAGE.getListOfReplicators()) { for (ServerName serverName : STORAGE.getListOfReplicators()) {
for (String queue : STORAGE.getAllQueues(serverName)) { for (String queue : STORAGE.getAllQueues(serverName)) {
STORAGE.removeQueue(serverName, queue); STORAGE.removeQueue(serverName, queue);
@ -301,6 +303,29 @@ public class TestZKReplicationQueueStorage {
String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7"; String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7";
String expectedPath = "/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId; String expectedPath = "/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId;
String path = STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId); String path = STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
Assert.assertEquals(expectedPath, path); assertEquals(expectedPath, path);
}
@Test
public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception {
String peerId = "1";
String peerIdToDelete = "2";
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
STORAGE.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i));
STORAGE.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i));
}
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
}
STORAGE.removeLastSequenceIds(peerIdToDelete);
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
assertEquals(HConstants.NO_SEQNUM,
STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
}
} }
} }

View File

@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisablePeerStateData;
/** /**
* The procedure for disabling a replication peer. * The procedure for disabling a replication peer.
*/ */
@ -67,4 +70,16 @@ public class DisablePeerProcedure extends ModifyPeerProcedure {
cpHost.postDisableReplicationPeer(peerId); cpHost.postDisableReplicationPeer(peerId);
} }
} }
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(DisablePeerStateData.getDefaultInstance());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
serializer.deserialize(DisablePeerStateData.class);
}
} }

View File

@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnablePeerStateData;
/** /**
* The procedure for enabling a replication peer. * The procedure for enabling a replication peer.
*/ */
@ -67,4 +70,16 @@ public class EnablePeerProcedure extends ModifyPeerProcedure {
cpHost.postEnableReplicationPeer(peerId); cpHost.postEnableReplicationPeer(peerId);
} }
} }
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(EnablePeerStateData.getDefaultInstance());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
serializer.deserialize(EnablePeerStateData.class);
}
} }

View File

@ -18,13 +18,18 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RemovePeerStateData;
/** /**
* The procedure for removing a replication peer. * The procedure for removing a replication peer.
*/ */
@ -33,6 +38,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
private static final Logger LOG = LoggerFactory.getLogger(RemovePeerProcedure.class); private static final Logger LOG = LoggerFactory.getLogger(RemovePeerProcedure.class);
private ReplicationPeerConfig peerConfig;
public RemovePeerProcedure() { public RemovePeerProcedure() {
} }
@ -51,7 +58,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
if (cpHost != null) { if (cpHost != null) {
cpHost.preRemoveReplicationPeer(peerId); cpHost.preRemoveReplicationPeer(peerId);
} }
env.getReplicationPeerManager().preRemovePeer(peerId); peerConfig = env.getReplicationPeerManager().preRemovePeer(peerId);
} }
@Override @Override
@ -63,10 +70,32 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
protected void postPeerModification(MasterProcedureEnv env) protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException { throws IOException, ReplicationException {
env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId); env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
if (peerConfig.isSerial()) {
env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
}
LOG.info("Successfully removed peer {}", peerId); LOG.info("Successfully removed peer {}", peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {
cpHost.postRemoveReplicationPeer(peerId); cpHost.postRemoveReplicationPeer(peerId);
} }
} }
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
RemovePeerStateData.Builder builder = RemovePeerStateData.newBuilder();
if (peerConfig != null) {
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
}
serializer.serialize(builder.build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
RemovePeerStateData data = serializer.deserialize(RemovePeerStateData.class);
if (data.hasPeerConfig()) {
this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
}
}
} }

View File

@ -109,8 +109,8 @@ public class ReplicationPeerManager {
return desc; return desc;
} }
void preRemovePeer(String peerId) throws DoNotRetryIOException { ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
checkPeerExists(peerId); return checkPeerExists(peerId).getPeerConfig();
} }
void preEnablePeer(String peerId) throws DoNotRetryIOException { void preEnablePeer(String peerId) throws DoNotRetryIOException {
@ -220,6 +220,10 @@ public class ReplicationPeerManager {
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
} }
void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
queueStorage.removeLastSequenceIds(peerId);
}
void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been // on-going when the refresh peer config procedure is done, if a RS which has already been

View File

@ -107,6 +107,9 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
@Override @Override
protected void postPeerModification(MasterProcedureEnv env) protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException { throws IOException, ReplicationException {
if (oldPeerConfig.isSerial() && !peerConfig.isSerial()) {
env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
}
LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig); LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {

View File

@ -26,8 +26,13 @@ import java.util.UUID;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@ -129,7 +134,10 @@ public class SerialReplicationTestBase {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
UTIL.getAdmin().removeReplicationPeer(PEER_ID); Admin admin = UTIL.getAdmin();
for (ReplicationPeerDescription pd : admin.listReplicationPeers()) {
admin.removeReplicationPeer(pd.getPeerId());
}
rollAllWALs(); rollAllWALs();
if (WRITER != null) { if (WRITER != null) {
WRITER.close(); WRITER.close();
@ -233,4 +241,13 @@ public class SerialReplicationTestBase {
assertEquals(expectedEntries, count); assertEquals(expectedEntries, count);
} }
} }
protected final TableName createTable() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
return tableName;
}
} }

View File

@ -21,14 +21,11 @@ import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.Replication;
@ -88,11 +85,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
@Test @Test
public void testAddPeer() throws Exception { public void testAddPeer() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = createTable();
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) { try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@ -118,12 +111,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(); .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = createTable();
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) { try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@ -159,11 +147,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
.setReplicateAllUserTables(false).setSerial(true).build(); .setReplicateAllUserTables(false).setSerial(true).build();
UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = createTable();
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) { try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@ -190,11 +174,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
@Test @Test
public void testDisabledTable() throws Exception { public void testDisabledTable() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = createTable();
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) { try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));

View File

@ -65,11 +65,7 @@ public class TestSerialReplication extends SerialReplicationTestBase {
@Test @Test
public void testRegionMove() throws Exception { public void testRegionMove() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = createTable();
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) { try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@ -89,11 +85,7 @@ public class TestSerialReplication extends SerialReplicationTestBase {
@Test @Test
public void testRegionSplit() throws Exception { public void testRegionSplit() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = createTable();
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) { try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@ -204,4 +196,58 @@ public class TestSerialReplication extends SerialReplicationTestBase {
assertEquals(200, count); assertEquals(200, count);
} }
} }
@Test
public void testRemovePeerNothingReplicated() throws Exception {
TableName tableName = createTable();
String encodedRegionName =
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
}
@Test
public void testRemovePeer() throws Exception {
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(100);
checkOrder(100);
String encodedRegionName =
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
// confirm that we delete the last pushed sequence id
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
}
@Test
public void testRemoveSerialFlag() throws Exception {
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(100);
checkOrder(100);
String encodedRegionName =
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
ReplicationPeerConfig peerConfig = UTIL.getAdmin().getReplicationPeerConfig(PEER_ID);
UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
ReplicationPeerConfig.newBuilder(peerConfig).setSerial(false).build());
// confirm that we delete the last pushed sequence id
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
}
} }