diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 772a9d67daf..a753d235bf1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -58,6 +59,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Unit testing of ReplicationAdmin @@ -69,6 +72,8 @@ public class TestReplicationAdmin { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationAdmin.class); + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -117,6 +122,70 @@ public class TestReplicationAdmin { } } + @Test + public void testConcurrentPeerOperations() throws Exception { + int threadNum = 5; + AtomicLong successCount = new AtomicLong(0); + + // Test concurrent add peer operation + Thread[] addPeers = new Thread[threadNum]; + for (int i = 0; i < threadNum; i++) { + addPeers[i] = new Thread(() -> { + try { + hbaseAdmin.addReplicationPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); + successCount.incrementAndGet(); + } catch (Exception e) { + LOG.debug("Got exception when add replication peer", e); + } + }); + addPeers[i].start(); + } + for (Thread addPeer : addPeers) { + addPeer.join(); + } + assertEquals(1, successCount.get()); + + // Test concurrent remove peer operation + successCount.set(0); + Thread[] removePeers = new Thread[threadNum]; + for (int i = 0; i < threadNum; i++) { + removePeers[i] = new Thread(() -> { + try { + hbaseAdmin.removeReplicationPeer(ID_ONE); + successCount.incrementAndGet(); + } catch (Exception e) { + LOG.debug("Got exception when remove replication peer", e); + } + }); + removePeers[i].start(); + } + for (Thread removePeer : removePeers) { + removePeer.join(); + } + assertEquals(1, successCount.get()); + + // Test concurrent add peer operation again + successCount.set(0); + addPeers = new Thread[threadNum]; + for (int i = 0; i < threadNum; i++) { + addPeers[i] = new Thread(() -> { + try { + hbaseAdmin.addReplicationPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); + successCount.incrementAndGet(); + } catch (Exception e) { + LOG.debug("Got exception when add replication peer", e); + } + }); + addPeers[i].start(); + } + for (Thread addPeer : addPeers) { + addPeer.join(); + } + assertEquals(1, successCount.get()); + } + @Test public void testAddInvalidPeer() { ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();