HBASE-20432 Cleanup related resources when remove a sync replication peer

This commit is contained in:
huzheng 2018-04-18 20:38:33 +08:00 committed by zhangduo
parent 1bea678ef8
commit 5b6c0d2777
5 changed files with 89 additions and 14 deletions

View File

@ -66,9 +66,19 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
env.getReplicationPeerManager().removePeer(peerId);
}
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
ReplaySyncReplicationWALManager remoteWALManager =
env.getMasterServices().getReplaySyncReplicationWALManager();
remoteWALManager.removePeerRemoteWALs(peerId);
remoteWALManager.removePeerReplayWALDir(peerId);
}
@Override
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
if (peerConfig.isSyncReplication()) {
removeRemoteWALs(env);
}
env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
if (peerConfig.isSerial()) {
env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);

View File

@ -115,6 +115,14 @@ public class ReplaySyncReplicationWALManager {
}
}
public void removePeerRemoteWALs(String peerId) throws IOException {
Path remoteWALDir = getPeerRemoteWALDir(peerId);
if (fs.exists(remoteWALDir) && !fs.delete(remoteWALDir, true)) {
throw new IOException(
"Failed to remove remote WALs dir " + remoteWALDir + " for peer id=" + peerId);
}
}
public void initPeerWorkers(String peerId) {
BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>();
services.getServerManager().getOnlineServers().keySet()

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
@ -71,6 +72,10 @@ public class SyncReplicationTestBase {
protected static String PEER_ID = "1";
protected static Path remoteWALDir1;
protected static Path remoteWALDir2;
private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
util.setZkCluster(ZK_UTIL.getZkCluster());
Configuration conf = util.getConfiguration();
@ -104,11 +109,11 @@ public class SyncReplicationTestBase {
UTIL2.getAdmin().createTable(td);
FileSystem fs1 = UTIL1.getTestFileSystem();
FileSystem fs2 = UTIL2.getTestFileSystem();
Path remoteWALDir1 =
new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
remoteWALDir1 =
new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
"remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
Path remoteWALDir2 =
new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
remoteWALDir2 =
new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
"remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
UTIL1.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
@ -188,7 +193,37 @@ public class SyncReplicationTestBase {
protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) {
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
return new Path(remoteWALDir, PEER_ID);
return getRemoteWALDir(remoteWALDir, peerId);
}
protected Path getRemoteWALDir(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
protected Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId + "-replay");
}
protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility)
throws Exception {
ReplicationPeerStorage rps = ReplicationStorageFactory
.getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
try {
rps.getPeerSyncReplicationState(peerId);
fail("Should throw exception when get the sync replication state of a removed peer.");
} catch (NullPointerException e) {
// ignore.
}
try {
rps.getPeerNewSyncReplicationState(peerId);
fail("Should throw exception when get the new sync replication state of a removed peer");
} catch (NullPointerException e) {
// ignore.
}
try (FileSystem fs = utility.getTestFileSystem()) {
Assert.assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId)));
Assert.assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId)));
}
}
protected void verifyReplicationRequestRejection(HBaseTestingUtility utility,

View File

@ -58,7 +58,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
// Ensure that there's no cluster id in remote log entries.
verifyNoClusterIdInRemoteLog(UTIL2, PEER_ID);
verifyNoClusterIdInRemoteLog(UTIL2, remoteWALDir2, PEER_ID);
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
@ -84,12 +84,9 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
write(UTIL2, 200, 300);
}
private void verifyNoClusterIdInRemoteLog(HBaseTestingUtility utility, String peerId)
throws Exception {
private void verifyNoClusterIdInRemoteLog(HBaseTestingUtility utility, Path remoteDir,
String peerId) throws Exception {
FileSystem fs2 = utility.getTestFileSystem();
Path remoteDir =
new Path(utility.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
"remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId));
Assert.assertTrue(files.length > 0);
for (FileStatus file : files) {

View File

@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -84,13 +87,35 @@ public class TestSyncReplicationStandBy extends SyncReplicationTestBase {
assertDisallow(table,
t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1")))));
assertDisallow(table,
t -> t
.put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")),
t -> t.put(
Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")),
new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1")))));
assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row"))
.add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))));
.add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))));
}
// We should still allow replication writes
writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
// Remove the peers in ACTIVE & STANDBY cluster.
FileSystem fs2 = remoteWALDir2.getFileSystem(UTIL2.getConfiguration());
Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
Assert.assertFalse(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
Assert.assertFalse(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID)));
UTIL1.getAdmin().removeReplicationPeer(PEER_ID);
verifyRemovedPeer(PEER_ID, remoteWALDir1, UTIL1);
// Peer remoteWAL dir will be renamed to replay WAL dir when transit from S to DA, and the
// replay WAL dir will be removed after replaying all WALs, so create a emtpy dir here to test
// whether the removeReplicationPeer would remove the remoteWAL dir.
fs2.create(getRemoteWALDir(remoteWALDir2, PEER_ID));
fs2.create(getReplayRemoteWALs(remoteWALDir2, PEER_ID));
Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
Assert.assertTrue(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID)));
UTIL2.getAdmin().removeReplicationPeer(PEER_ID);
verifyRemovedPeer(PEER_ID, remoteWALDir2, UTIL2);
}
}