HBASE-19943 Only allow removing sync replication peer which is in DA state

This commit is contained in:
huzheng 2018-03-01 18:34:02 +08:00 committed by zhangduo
parent 0c97cda2a9
commit 45794d4156
4 changed files with 78 additions and 3 deletions

View File

@ -120,8 +120,20 @@ public class ReplicationPeerManager {
return desc; return desc;
} }
private void checkPeerInDAStateIfSyncReplication(String peerId) throws DoNotRetryIOException {
ReplicationPeerDescription desc = peers.get(peerId);
if (desc != null && desc.getPeerConfig().isSyncReplication()
&& !SyncReplicationState.DOWNGRADE_ACTIVE.equals(desc.getSyncReplicationState())) {
throw new DoNotRetryIOException("Couldn't remove synchronous replication peer with state="
+ desc.getSyncReplicationState()
+ ", Transit the synchronous replication state to be DOWNGRADE_ACTIVE firstly.");
}
}
ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException { ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
return checkPeerExists(peerId).getPeerConfig(); ReplicationPeerDescription pd = checkPeerExists(peerId);
checkPeerInDAStateIfSyncReplication(peerId);
return pd.getPeerConfig();
} }
void preEnablePeer(String peerId) throws DoNotRetryIOException { void preEnablePeer(String peerId) throws DoNotRetryIOException {

View File

@ -142,7 +142,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override @Override
public WAL getWAL(RegionInfo region) throws IOException { public WAL getWAL(RegionInfo region) throws IOException {
if (region == null) { if (region == null) {
return provider.getWAL(region); return provider.getWAL(null);
} }
Optional<Pair<String, String>> peerIdAndRemoteWALDir = Optional<Pair<String, String>> peerIdAndRemoteWALDir =
peerInfoProvider.getPeerIdAndRemoteWALDir(region); peerInfoProvider.getPeerIdAndRemoteWALDir(region);

View File

@ -253,6 +253,62 @@ public class TestReplicationAdmin {
assertEquals(0, hbaseAdmin.listReplicationPeers().size()); assertEquals(0, hbaseAdmin.listReplicationPeers().size());
} }
@Test
public void testRemovePeerWithNonDAState() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.createTable(tableName, Bytes.toBytes("family"));
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
String rootDir = "hdfs://srv1:9999/hbase";
builder.setClusterKey(KEY_ONE);
builder.setRemoteWALDir(rootDir);
builder.setReplicateAllUserTables(false);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, new ArrayList<>());
builder.setTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
// Transit sync replication state to ACTIVE.
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, SyncReplicationState.ACTIVE);
assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
try {
hbaseAdmin.removeReplicationPeer(ID_ONE);
fail("Can't remove a synchronous replication peer with state=ACTIVE");
} catch (IOException e) {
// OK
}
// Transit sync replication state to DA
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE,
SyncReplicationState.DOWNGRADE_ACTIVE);
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
// Transit sync replication state to STANDBY
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, SyncReplicationState.STANDBY);
assertEquals(SyncReplicationState.STANDBY,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
try {
hbaseAdmin.removeReplicationPeer(ID_ONE);
fail("Can't remove a synchronous replication peer with state=STANDBY");
} catch (IOException e) {
// OK
}
// Transit sync replication state to DA
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE,
SyncReplicationState.DOWNGRADE_ACTIVE);
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
hbaseAdmin.removeReplicationPeer(ID_ONE);
assertEquals(0, hbaseAdmin.listReplicationPeers().size());
}
@Test @Test
public void testAddPeerWithState() throws Exception { public void testAddPeerWithState() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
@ -1072,5 +1128,12 @@ public class TestReplicationAdmin {
} catch (Exception e) { } catch (Exception e) {
// OK // OK
} }
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
SyncReplicationState.DOWNGRADE_ACTIVE);
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
hbaseAdmin.removeReplicationPeer(ID_ONE);
hbaseAdmin.removeReplicationPeer(ID_SECOND);
assertEquals(0, hbaseAdmin.listReplicationPeers().size());
} }
} }

View File

@ -109,7 +109,7 @@ public class TestSyncReplication {
UTIL1.startMiniCluster(3); UTIL1.startMiniCluster(3);
UTIL2.startMiniCluster(3); UTIL2.startMiniCluster(3);
TableDescriptor td = TableDescriptor td =
TableDescriptorBuilder.newBuilder(TABLE_NAME).addColumnFamily(ColumnFamilyDescriptorBuilder TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
UTIL1.getAdmin().createTable(td); UTIL1.getAdmin().createTable(td);
UTIL2.getAdmin().createTable(td); UTIL2.getAdmin().createTable(td);