HBASE-27783 Addendum forward port the test improvement when backporting to branch-2

This commit is contained in:
Duo Zhang 2023-04-20 15:11:22 +08:00
parent 398c5ef313
commit f5ee958ead
1 changed files with 40 additions and 4 deletions

View File

@ -23,8 +23,11 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -40,11 +43,17 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ MasterTests.class, LargeTests.class }) @Category({ MasterTests.class, LargeTests.class })
public class TestDisablePeerModification { public class TestDisablePeerModification {
@ -54,9 +63,9 @@ public class TestDisablePeerModification {
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static CountDownLatch ARRIVE = new CountDownLatch(1); private static volatile CountDownLatch ARRIVE;
private static CountDownLatch RESUME = new CountDownLatch(1); private static volatile CountDownLatch RESUME;
public static final class MockPeerStorage extends FSReplicationPeerStorage { public static final class MockPeerStorage extends FSReplicationPeerStorage {
@ -77,6 +86,14 @@ public class TestDisablePeerModification {
} }
} }
@Parameter
public boolean async;
@Parameters(name = "{index}: async={0}")
public static List<Object[]> params() {
return Arrays.asList(new Object[] { true }, new Object[] { false });
}
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
UTIL.getConfiguration().setClass(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, UTIL.getConfiguration().setClass(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
@ -89,18 +106,37 @@ public class TestDisablePeerModification {
UTIL.shutdownMiniCluster(); UTIL.shutdownMiniCluster();
} }
@Before
public void setUpBeforeTest() throws IOException {
UTIL.getAdmin().replicationPeerModificationSwitch(true, true);
}
@Test @Test
public void testDrainProcs() throws Exception { public void testDrainProcs() throws Exception {
ARRIVE = new CountDownLatch(1);
RESUME = new CountDownLatch(1);
AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin(); AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin();
ReplicationPeerConfig rpc = ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test") ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build(); .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
CompletableFuture<Void> addFuture = admin.addReplicationPeer("test_peer", rpc); CompletableFuture<Void> addFuture = admin.addReplicationPeer("test_peer_" + async, rpc);
ARRIVE.await(); ARRIVE.await();
// we have a pending add peer procedure which has already passed the first state, let's issue a // we have a pending add peer procedure which has already passed the first state, let's issue a
// peer modification switch request to disable peer modification and set drainProcs to true // peer modification switch request to disable peer modification and set drainProcs to true
CompletableFuture<Boolean> switchFuture = admin.replicationPeerModificationSwitch(false, true); CompletableFuture<Boolean> switchFuture;
if (async) {
switchFuture = admin.replicationPeerModificationSwitch(false, true);
} else {
switchFuture = new CompletableFuture<>();
ForkJoinPool.commonPool().submit(() -> {
try {
switchFuture.complete(UTIL.getAdmin().replicationPeerModificationSwitch(false, true));
} catch (IOException e) {
switchFuture.completeExceptionally(e);
}
});
}
// sleep a while, the switchFuture should not finish yet // sleep a while, the switchFuture should not finish yet
// the sleep is necessary as we can not join on the switchFuture, so there is no stable way to // the sleep is necessary as we can not join on the switchFuture, so there is no stable way to