ARTEMIS-4355 Update Curator to 5.5.0; Zookeeper 3.8.2

This commit is contained in:
Alexey Markevich 2023-07-19 20:33:05 +02:00 committed by Robbie Gemmell
parent 6796e548be
commit ce8163b780
5 changed files with 20 additions and 52 deletions

View File

@ -27,8 +27,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.quorum.DistributedLock; import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager; import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
@ -83,8 +81,6 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
} }
testingServer = new TestingCluster(clusterSpecs); testingServer = new TestingCluster(clusterSpecs);
testingServer.start(); testingServer.start();
// start waits for quorumPeer!=null but not that it has started...
Wait.waitFor(this::ensembleHasLeader);
connectString = testingServer.getConnectString(); connectString = testingServer.getConnectString();
super.setupEnv(); super.setupEnv();
} }
@ -207,7 +203,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
public void cannotStartManagerWithoutQuorum() throws Exception { public void cannotStartManagerWithoutQuorum() throws Exception {
Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1); Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1);
DistributedPrimitiveManager manager = createManagedDistributeManager(); DistributedPrimitiveManager manager = createManagedDistributeManager();
stopMajorityNotLeaderNodes(true); stopMajority(true);
Assert.assertFalse(manager.start(2, TimeUnit.SECONDS)); Assert.assertFalse(manager.start(2, TimeUnit.SECONDS));
Assert.assertFalse(manager.isStarted()); Assert.assertFalse(manager.isStarted());
} }
@ -217,7 +213,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1); Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1);
DistributedPrimitiveManager manager = createManagedDistributeManager(); DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start(); manager.start();
stopMajorityNotLeaderNodes(true); stopMajority(true);
DistributedLock lock = manager.getDistributedLock("a"); DistributedLock lock = manager.getDistributedLock("a");
lock.tryLock(); lock.tryLock();
} }
@ -227,7 +223,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1); Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1);
DistributedPrimitiveManager manager = createManagedDistributeManager(); DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start(); manager.start();
stopMajorityNotLeaderNodes(true); stopMajority(true);
DistributedLock lock = manager.getDistributedLock("a"); DistributedLock lock = manager.getDistributedLock("a");
final boolean held; final boolean held;
try { try {
@ -243,7 +239,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1); Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1);
DistributedPrimitiveManager manager = createManagedDistributeManager(); DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start(); manager.start();
stopMajorityNotLeaderNodes(true); stopMajority(true);
DistributedLock lock = manager.getDistributedLock("a"); DistributedLock lock = manager.getDistributedLock("a");
Assert.assertNotNull(lock); Assert.assertNotNull(lock);
} }
@ -256,7 +252,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
DistributedLock lock = manager.getDistributedLock("a"); DistributedLock lock = manager.getDistributedLock("a");
CountDownLatch unavailable = new CountDownLatch(1); CountDownLatch unavailable = new CountDownLatch(1);
lock.addListener(unavailable::countDown); lock.addListener(unavailable::countDown);
stopMajorityNotLeaderNodes(true); stopMajority(true);
Assert.assertTrue(unavailable.await(SESSION_MS + SERVER_TICK_MS, TimeUnit.MILLISECONDS)); Assert.assertTrue(unavailable.await(SESSION_MS + SERVER_TICK_MS, TimeUnit.MILLISECONDS));
} }
@ -270,7 +266,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
final AtomicInteger unavailableLock = new AtomicInteger(0); final AtomicInteger unavailableLock = new AtomicInteger(0);
manager.addUnavailableManagerListener(unavailableManager::incrementAndGet); manager.addUnavailableManagerListener(unavailableManager::incrementAndGet);
lock.addListener(unavailableLock::incrementAndGet); lock.addListener(unavailableLock::incrementAndGet);
stopMajorityNotLeaderNodes(true); stopMajority(true);
TimeUnit.MILLISECONDS.sleep(SESSION_MS + SERVER_TICK_MS + CONNECTION_MS); TimeUnit.MILLISECONDS.sleep(SESSION_MS + SERVER_TICK_MS + CONNECTION_MS);
Assert.assertEquals(1, unavailableLock.get()); Assert.assertEquals(1, unavailableLock.get());
Assert.assertEquals(1, unavailableManager.get()); Assert.assertEquals(1, unavailableManager.get());
@ -305,7 +301,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
timedLock.start(); timedLock.start();
Assert.assertTrue(startedTimedLock.await(10, TimeUnit.SECONDS)); Assert.assertTrue(startedTimedLock.await(10, TimeUnit.SECONDS));
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
stopMajorityNotLeaderNodes(true); stopMajority(true);
TimeUnit.MILLISECONDS.sleep(SESSION_MS + CONNECTION_MS); TimeUnit.MILLISECONDS.sleep(SESSION_MS + CONNECTION_MS);
Wait.waitFor(() -> unavailableLock.get() > 0, SERVER_TICK_MS); Wait.waitFor(() -> unavailableLock.get() > 0, SERVER_TICK_MS);
Assert.assertEquals(1, unavailableManager.get()); Assert.assertEquals(1, unavailableManager.get());
@ -323,7 +319,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
}; };
manager.addUnavailableManagerListener(managerListener); manager.addUnavailableManagerListener(managerListener);
Assert.assertFalse(unavailable.get()); Assert.assertFalse(unavailable.get());
stopMajorityNotLeaderNodes(true); stopMajority(true);
Wait.waitFor(unavailable::get); Wait.waitFor(unavailable::get);
manager.removeUnavailableManagerListener(managerListener); manager.removeUnavailableManagerListener(managerListener);
final AtomicInteger unavailableOnRegister = new AtomicInteger(); final AtomicInteger unavailableOnRegister = new AtomicInteger();
@ -336,21 +332,8 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
} }
} }
private boolean ensembleHasLeader() { private void stopMajority(boolean fromLast) throws Exception {
return testingServer.getServers().stream().filter(CuratorDistributedLockTest::isLeader).count() != 0; List<TestingZooKeeperServer> followers = testingServer.getServers();
}
private static boolean isLeader(TestingZooKeeperServer server) {
if (server.getInstanceSpecs().size() == 1) {
return true;
}
long leaderId = server.getQuorumPeer().getLeaderId();
long id = server.getQuorumPeer().getId();
return id == leaderId;
}
private void stopMajorityNotLeaderNodes(boolean fromLast) throws Exception {
List<TestingZooKeeperServer> followers = testingServer.getServers().stream().filter(Predicate.not(CuratorDistributedLockTest::isLeader)).collect(Collectors.toList());
final int quorum = (zkNodes / 2) + 1; final int quorum = (zkNodes / 2) + 1;
for (int i = 0; i < quorum; i++) { for (int i = 0; i < quorum; i++) {
final int nodeIndex = fromLast ? (followers.size() - 1) - i : i; final int nodeIndex = fromLast ? (followers.size() - 1) - i : i;

12
pom.xml
View File

@ -114,8 +114,8 @@
<jctools.version>2.1.2</jctools.version> <jctools.version>2.1.2</jctools.version>
<netty.version>4.1.96.Final</netty.version> <netty.version>4.1.96.Final</netty.version>
<hdrhistogram.version>2.1.12</hdrhistogram.version> <hdrhistogram.version>2.1.12</hdrhistogram.version>
<curator.version>5.2.0</curator.version> <curator.version>5.5.0</curator.version>
<zookeeper.version>3.6.3</zookeeper.version> <zookeeper.version>3.8.2</zookeeper.version>
<woodstox.version>4.4.0</woodstox.version> <woodstox.version>4.4.0</woodstox.version>
<!-- docs --> <!-- docs -->
@ -982,12 +982,8 @@
<version>${zookeeper.version}</version> <version>${zookeeper.version}</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>ch.qos.logback</groupId>
<artifactId>slf4j-log4j12</artifactId> <artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion> </exclusion>
<exclusion> <exclusion>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>

View File

@ -19,7 +19,7 @@ package org.apache.activemq.artemis.tests.smoke.quorum;
import javax.management.remote.JMXServiceURL; import javax.management.remote.JMXServiceURL;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -128,12 +128,12 @@ public abstract class PluggableQuorumSinglePairTest extends SmokeTestBase {
protected BrokerControl primary; protected BrokerControl primary;
protected BrokerControl backup; protected BrokerControl backup;
protected LinkedList<BrokerControl> brokers; protected List<BrokerControl> brokers;
public PluggableQuorumSinglePairTest(String brokerFolderPrefix) { public PluggableQuorumSinglePairTest(String brokerFolderPrefix) {
primary = new BrokerControl("primary", JMX_PORT_PRIMARY, brokerFolderPrefix + PRIMARY_DATA_FOLDER, PRIMARY_PORT_OFFSET); primary = new BrokerControl("primary", JMX_PORT_PRIMARY, brokerFolderPrefix + PRIMARY_DATA_FOLDER, PRIMARY_PORT_OFFSET);
backup = new BrokerControl("backup", JMX_PORT_BACKUP, brokerFolderPrefix + BACKUP_DATA_FOLDER, BACKUP_PORT_OFFSET); backup = new BrokerControl("backup", JMX_PORT_BACKUP, brokerFolderPrefix + BACKUP_DATA_FOLDER, BACKUP_PORT_OFFSET);
brokers = new LinkedList(Arrays.asList(primary, backup)); brokers = Arrays.asList(primary, backup);
} }
protected abstract boolean awaitAsyncSetupCompleted(long timeout, TimeUnit unit) throws InterruptedException; protected abstract boolean awaitAsyncSetupCompleted(long timeout, TimeUnit unit) throws InterruptedException;

View File

@ -17,13 +17,13 @@
package org.apache.activemq.artemis.tests.smoke.quorum; package org.apache.activemq.artemis.tests.smoke.quorum;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.util.ServerUtil; import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -48,15 +48,14 @@ public class ZookeeperPluggableQuorumPeerTest extends ZookeeperPluggableQuorumSi
// both roles as both wish to be primary but will revert to backup // both roles as both wish to be primary but will revert to backup
primary = new BrokerControl("primary-peer-a", JMX_PORT_PRIMARY, "zkReplicationPrimaryPeerA", PRIMARY_PORT_OFFSET); primary = new BrokerControl("primary-peer-a", JMX_PORT_PRIMARY, "zkReplicationPrimaryPeerA", PRIMARY_PORT_OFFSET);
backup = new BrokerControl("primary-peer-b", JMX_PORT_BACKUP, "zkReplicationPrimaryPeerB", BACKUP_PORT_OFFSET); backup = new BrokerControl("primary-peer-b", JMX_PORT_BACKUP, "zkReplicationPrimaryPeerB", BACKUP_PORT_OFFSET);
brokers = new LinkedList(Arrays.asList(primary, backup)); brokers = Arrays.asList(primary, backup);
} }
@Ignore
@Test @Test
@Override @Override
public void testBackupFailoverAndPrimaryFailback() throws Exception { public void testBackupFailoverAndPrimaryFailback() throws Exception {
// peers don't request fail back by default // peers don't request fail back by default
// just wait for setup to avoid partial stop of zk via fast tear down with async setup
Wait.waitFor(this::ensembleHasLeader);
} }
@Test @Test

View File

@ -78,16 +78,6 @@ public class ZookeeperPluggableQuorumSinglePairTest extends PluggableQuorumSingl
return true; return true;
} }
protected boolean ensembleHasLeader() {
return testingServer.getServers().stream().filter(ZookeeperPluggableQuorumSinglePairTest::isLeader).count() != 0;
}
private static boolean isLeader(TestingZooKeeperServer server) {
long leaderId = server.getQuorumPeer().getLeaderId();
long id = server.getQuorumPeer().getId();
return id == leaderId;
}
@Override @Override
protected int[] stopMajority() throws Exception { protected int[] stopMajority() throws Exception {
List<TestingZooKeeperServer> followers = testingServer.getServers(); List<TestingZooKeeperServer> followers = testingServer.getServers();