HBASE-20285 Delete all last pushed sequence ids when removing a peer or removing the serial flag for a peer

This commit is contained in:
zhangduo 2018-03-26 22:17:00 +08:00
parent 83488b866f
commit ead569c951
12 changed files with 227 additions and 43 deletions

View File

@ -421,3 +421,13 @@ message UpdatePeerConfigStateData {
required ReplicationPeer peer_config = 1;
optional ReplicationPeer old_peer_config = 2;
}
message RemovePeerStateData {
optional ReplicationPeer peer_config = 1;
}
message EnablePeerStateData {
}
message DisablePeerStateData {
}

View File

@ -86,6 +86,11 @@ public interface ReplicationQueueStorage {
*/
void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException;
/**
* Remove all the max sequence id record for the given peer.
* @param peerId peer id
*/
void removeLastSequenceIds(String peerId) throws ReplicationException;
/**
* Get the current position for a specific WAL in a given queue for a given regionserver.
* @param serverName the name of the regionserver

View File

@ -102,7 +102,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
*/
private final String hfileRefsZNode;
private final String regionsZNode;
@VisibleForTesting
final String regionsZNode;
public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
@ -311,6 +312,40 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
}
@Override
public void removeLastSequenceIds(String peerId) throws ReplicationException {
String suffix = "-" + peerId;
try {
StringBuilder sb = new StringBuilder(regionsZNode);
int regionsZNodeLength = regionsZNode.length();
int levelOneLength = regionsZNodeLength + 3;
int levelTwoLength = levelOneLength + 3;
List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
// it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids
// yet, so we need an extra check here.
if (CollectionUtils.isEmpty(levelOneDirs)) {
return;
}
for (String levelOne : levelOneDirs) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne);
for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo);
for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
if (znode.endsWith(suffix)) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode);
ZKUtil.deleteNode(zookeeper, sb.toString());
sb.setLength(levelTwoLength);
}
}
sb.setLength(levelOneLength);
}
sb.setLength(regionsZNodeLength);
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e);
}
}
@Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {

View File

@ -32,15 +32,17 @@ import java.util.SortedSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@ -71,7 +73,7 @@ public class TestZKReplicationQueueStorage {
}
@After
public void tearDownAfterTest() throws ReplicationException {
public void tearDownAfterTest() throws ReplicationException, KeeperException, IOException {
for (ServerName serverName : STORAGE.getListOfReplicators()) {
for (String queue : STORAGE.getAllQueues(serverName)) {
STORAGE.removeQueue(serverName, queue);
@ -301,6 +303,29 @@ public class TestZKReplicationQueueStorage {
String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7";
String expectedPath = "/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId;
String path = STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
Assert.assertEquals(expectedPath, path);
assertEquals(expectedPath, path);
}
@Test
public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception {
String peerId = "1";
String peerIdToDelete = "2";
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
STORAGE.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i));
STORAGE.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i));
}
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
}
STORAGE.removeLastSequenceIds(peerIdToDelete);
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
assertEquals(HConstants.NO_SEQNUM,
STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
}
}
}

View File

@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisablePeerStateData;
/**
* The procedure for disabling a replication peer.
*/
@ -67,4 +70,16 @@ public class DisablePeerProcedure extends ModifyPeerProcedure {
cpHost.postDisableReplicationPeer(peerId);
}
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(DisablePeerStateData.getDefaultInstance());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
serializer.deserialize(DisablePeerStateData.class);
}
}

View File

@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnablePeerStateData;
/**
* The procedure for enabling a replication peer.
*/
@ -67,4 +70,16 @@ public class EnablePeerProcedure extends ModifyPeerProcedure {
cpHost.postEnableReplicationPeer(peerId);
}
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(EnablePeerStateData.getDefaultInstance());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
serializer.deserialize(EnablePeerStateData.class);
}
}

View File

@ -18,13 +18,18 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RemovePeerStateData;
/**
* The procedure for removing a replication peer.
*/
@ -33,6 +38,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
private static final Logger LOG = LoggerFactory.getLogger(RemovePeerProcedure.class);
private ReplicationPeerConfig peerConfig;
public RemovePeerProcedure() {
}
@ -51,7 +58,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
if (cpHost != null) {
cpHost.preRemoveReplicationPeer(peerId);
}
env.getReplicationPeerManager().preRemovePeer(peerId);
peerConfig = env.getReplicationPeerManager().preRemovePeer(peerId);
}
@Override
@ -63,10 +70,32 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
if (peerConfig.isSerial()) {
env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
}
LOG.info("Successfully removed peer {}", peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postRemoveReplicationPeer(peerId);
}
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
RemovePeerStateData.Builder builder = RemovePeerStateData.newBuilder();
if (peerConfig != null) {
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
}
serializer.serialize(builder.build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
RemovePeerStateData data = serializer.deserialize(RemovePeerStateData.class);
if (data.hasPeerConfig()) {
this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
}
}
}

View File

@ -109,8 +109,8 @@ public class ReplicationPeerManager {
return desc;
}
void preRemovePeer(String peerId) throws DoNotRetryIOException {
checkPeerExists(peerId);
ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
return checkPeerExists(peerId).getPeerConfig();
}
void preEnablePeer(String peerId) throws DoNotRetryIOException {
@ -220,6 +220,10 @@ public class ReplicationPeerManager {
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
}
void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
queueStorage.removeLastSequenceIds(peerId);
}
void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been

View File

@ -107,6 +107,9 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
@Override
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
if (oldPeerConfig.isSerial() && !peerConfig.isSerial()) {
env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
}
LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {

View File

@ -26,8 +26,13 @@ import java.util.UUID;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@ -129,7 +134,10 @@ public class SerialReplicationTestBase {
@After
public void tearDown() throws Exception {
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
Admin admin = UTIL.getAdmin();
for (ReplicationPeerDescription pd : admin.listReplicationPeers()) {
admin.removeReplicationPeer(pd.getPeerId());
}
rollAllWALs();
if (WRITER != null) {
WRITER.close();
@ -233,4 +241,13 @@ public class SerialReplicationTestBase {
assertEquals(expectedEntries, count);
}
}
protected final TableName createTable() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
return tableName;
}
}

View File

@ -21,14 +21,11 @@ import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
@ -88,11 +85,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
@Test
public void testAddPeer() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@ -118,12 +111,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@ -159,11 +147,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
.setReplicateAllUserTables(false).setSerial(true).build();
UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@ -190,11 +174,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
@Test
public void testDisabledTable() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));

View File

@ -65,11 +65,7 @@ public class TestSerialReplication extends SerialReplicationTestBase {
@Test
public void testRegionMove() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@ -89,11 +85,7 @@ public class TestSerialReplication extends SerialReplicationTestBase {
@Test
public void testRegionSplit() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@ -148,7 +140,7 @@ public class TestSerialReplication extends SerialReplicationTestBase {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName)
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build(),
new byte[][] { splitKey });
@ -204,4 +196,58 @@ public class TestSerialReplication extends SerialReplicationTestBase {
assertEquals(200, count);
}
}
@Test
public void testRemovePeerNothingReplicated() throws Exception {
TableName tableName = createTable();
String encodedRegionName =
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
}
@Test
public void testRemovePeer() throws Exception {
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(100);
checkOrder(100);
String encodedRegionName =
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
// confirm that we delete the last pushed sequence id
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
}
@Test
public void testRemoveSerialFlag() throws Exception {
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(100);
checkOrder(100);
String encodedRegionName =
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
ReplicationPeerConfig peerConfig = UTIL.getAdmin().getReplicationPeerConfig(PEER_ID);
UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
ReplicationPeerConfig.newBuilder(peerConfig).setSerial(false).build());
// confirm that we delete the last pushed sequence id
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
}
}