HBASE-21277 Prevent to add same table to two sync replication peer's config
This commit is contained in:
parent
f122328758
commit
a1f28f3ca7
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
@ -79,13 +80,24 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
|
||||||
return peerConfig;
|
return peerConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void releaseLatch(MasterProcedureEnv env) {
|
||||||
|
if (peerConfig.isSyncReplication()) {
|
||||||
|
env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
|
||||||
|
}
|
||||||
|
ProcedurePrepareLatch.releaseLatch(latch, this);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void prePeerModification(MasterProcedureEnv env)
|
protected void prePeerModification(MasterProcedureEnv env)
|
||||||
throws IOException, ReplicationException {
|
throws IOException, ReplicationException, InterruptedException {
|
||||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.preAddReplicationPeer(peerId, peerConfig);
|
cpHost.preAddReplicationPeer(peerId, peerConfig);
|
||||||
}
|
}
|
||||||
|
if (peerConfig.isSyncReplication()) {
|
||||||
|
env.getReplicationPeerManager().acquireSyncReplicationPeerLock();
|
||||||
|
}
|
||||||
env.getReplicationPeerManager().preAddPeer(peerId, peerConfig);
|
env.getReplicationPeerManager().preAddPeer(peerId, peerConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
* all checks passes then the procedure can not be rolled back any more.
|
* all checks passes then the procedure can not be rolled back any more.
|
||||||
*/
|
*/
|
||||||
protected abstract void prePeerModification(MasterProcedureEnv env)
|
protected abstract void prePeerModification(MasterProcedureEnv env)
|
||||||
throws IOException, ReplicationException;
|
throws IOException, ReplicationException, InterruptedException;
|
||||||
|
|
||||||
protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
|
protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
protected abstract void postPeerModification(MasterProcedureEnv env)
|
protected abstract void postPeerModification(MasterProcedureEnv env)
|
||||||
throws IOException, ReplicationException;
|
throws IOException, ReplicationException;
|
||||||
|
|
||||||
private void releaseLatch() {
|
protected void releaseLatch(MasterProcedureEnv env) {
|
||||||
ProcedurePrepareLatch.releaseLatch(latch, this);
|
ProcedurePrepareLatch.releaseLatch(latch, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,7 +241,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
|
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
|
||||||
throws ProcedureSuspendedException {
|
throws ProcedureSuspendedException, InterruptedException {
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case PRE_PEER_MODIFICATION:
|
case PRE_PEER_MODIFICATION:
|
||||||
try {
|
try {
|
||||||
|
@ -250,7 +250,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
|
LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
|
||||||
"mark the procedure as failure and give up", getClass().getName(), peerId, e);
|
"mark the procedure as failure and give up", getClass().getName(), peerId, e);
|
||||||
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
|
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
|
||||||
releaseLatch();
|
releaseLatch(env);
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
|
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
|
||||||
|
@ -330,7 +330,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
LOG.warn("{} failed to call post CP hook for peer {}, " +
|
LOG.warn("{} failed to call post CP hook for peer {}, " +
|
||||||
"ignore since the procedure has already done", getClass().getName(), peerId, e);
|
"ignore since the procedure has already done", getClass().getName(), peerId, e);
|
||||||
}
|
}
|
||||||
releaseLatch();
|
releaseLatch(env);
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
@ -75,6 +76,9 @@ public class ReplicationPeerManager {
|
||||||
SyncReplicationState.DOWNGRADE_ACTIVE,
|
SyncReplicationState.DOWNGRADE_ACTIVE,
|
||||||
EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
|
EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
|
||||||
|
|
||||||
|
// Only allow to add one sync replication peer concurrently
|
||||||
|
private final Semaphore syncReplicationPeerLock = new Semaphore(1);
|
||||||
|
|
||||||
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
|
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
|
||||||
ConcurrentMap<String, ReplicationPeerDescription> peers) {
|
ConcurrentMap<String, ReplicationPeerDescription> peers) {
|
||||||
this.peerStorage = peerStorage;
|
this.peerStorage = peerStorage;
|
||||||
|
@ -105,6 +109,9 @@ public class ReplicationPeerManager {
|
||||||
throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
|
throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
|
||||||
}
|
}
|
||||||
checkPeerConfig(peerConfig);
|
checkPeerConfig(peerConfig);
|
||||||
|
if (peerConfig.isSyncReplication()) {
|
||||||
|
checkSyncReplicationPeerConfigConflict(peerConfig);
|
||||||
|
}
|
||||||
if (peers.containsKey(peerId)) {
|
if (peers.containsKey(peerId)) {
|
||||||
throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
|
throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
|
||||||
}
|
}
|
||||||
|
@ -385,6 +392,7 @@ public class ReplicationPeerManager {
|
||||||
"Only support replicated table config for sync replication peer");
|
"Only support replicated table config for sync replication peer");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
|
Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
|
||||||
if (!remoteWALDir.isAbsolute()) {
|
if (!remoteWALDir.isAbsolute()) {
|
||||||
throw new DoNotRetryIOException(
|
throw new DoNotRetryIOException(
|
||||||
|
@ -397,6 +405,19 @@ public class ReplicationPeerManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig)
|
||||||
|
throws DoNotRetryIOException {
|
||||||
|
for (TableName tableName : peerConfig.getTableCFsMap().keySet()) {
|
||||||
|
for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) {
|
||||||
|
ReplicationPeerConfig rpc = entry.getValue().getPeerConfig();
|
||||||
|
if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) {
|
||||||
|
throw new DoNotRetryIOException(
|
||||||
|
"Table " + tableName + " has been replicated by peer " + entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set a namespace in the peer config means that all tables in this namespace will be replicated
|
* Set a namespace in the peer config means that all tables in this namespace will be replicated
|
||||||
* to the peer cluster.
|
* to the peer cluster.
|
||||||
|
@ -493,4 +514,12 @@ public class ReplicationPeerManager {
|
||||||
}
|
}
|
||||||
return s1.equals(s2);
|
return s1.equals(s2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void acquireSyncReplicationPeerLock() throws InterruptedException {
|
||||||
|
syncReplicationPeerLock.acquire();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void releaseSyncReplicationPeerLock() {
|
||||||
|
syncReplicationPeerLock.release();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client.replication;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@Category({MediumTests.class, ClientTests.class})
|
||||||
|
public class TestReplicationAdminForSyncReplication {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestReplicationAdminForSyncReplication.class);
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestReplicationAdminForSyncReplication.class);
|
||||||
|
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL =
|
||||||
|
new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static Admin hbaseAdmin;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||||
|
TEST_UTIL.startMiniCluster();
|
||||||
|
hbaseAdmin = TEST_UTIL.getAdmin();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
hbaseAdmin.close();
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddPeerWithSameTable() throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf("testAddPeerWithSameTable");
|
||||||
|
TEST_UTIL.createTable(tableName, Bytes.toBytes("family"));
|
||||||
|
|
||||||
|
boolean[] success = { true, true, true, true, true, true };
|
||||||
|
Thread[] threads = new Thread[5];
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
String peerId = "id" + i;
|
||||||
|
String clusterKey = "127.0.0.1:2181:/hbase" + i;
|
||||||
|
int index = i;
|
||||||
|
threads[i] = new Thread(() -> {
|
||||||
|
try {
|
||||||
|
hbaseAdmin
|
||||||
|
.addReplicationPeer(peerId, buildSyncReplicationPeerConfig(clusterKey, tableName));
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Failed to add replication peer " + peerId);
|
||||||
|
success[index] = false;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
threads[i].join();
|
||||||
|
}
|
||||||
|
|
||||||
|
int successCount = 0;
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
if (success[i]) {
|
||||||
|
successCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals("Only one peer can be added successfully", 1, successCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReplicationPeerConfig buildSyncReplicationPeerConfig(String clusterKey,
|
||||||
|
TableName tableName) throws IOException {
|
||||||
|
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
|
||||||
|
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
|
||||||
|
builder.setClusterKey(clusterKey);
|
||||||
|
builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
|
||||||
|
TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString());
|
||||||
|
builder.setReplicateAllUserTables(false);
|
||||||
|
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||||
|
tableCfs.put(tableName, new ArrayList<>());
|
||||||
|
builder.setTableCFsMap(tableCfs);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue