diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index b47dca985a..98b550e8c0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -109,9 +109,15 @@ public class ClusterController implements ActiveMQComponent { //latch so we know once we are connected replicationClusterConnectedLatch = new CountDownLatch(1); //and add the quorum manager as a topology listener - defaultLocator.addClusterTopologyListener(quorumManager); - //start the quorum manager - quorumManager.start(); + if (defaultLocator != null) { + defaultLocator.addClusterTopologyListener(quorumManager); + } + + if (quorumManager != null) { + //start the quorum manager + quorumManager.start(); + } + started = true; //connect all the locators in a separate thread for (ServerLocatorInternal serverLocatorInternal : locators.values()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index 7d0faba866..d2219c29bc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -247,9 +247,7 @@ public final class ClusterManager implements ActiveMQComponent { /* * only start if we are actually in a cluster * */ - if (clusterConnections.size() > 0) { - clusterController.start(); - } + clusterController.start(); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 2b5ecaf4f0..c31a32320b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2420,6 +2420,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { long txID = storageManager.generateID(); storageManager.deleteAddressBinding(txID, addressInfo.getId()); storageManager.commitBindings(txID); + pagingManager.deletePageStore(address); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 44c5ba4e22..e04cf47f9c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -49,6 +49,7 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; import org.apache.activemq.artemis.core.persistence.QueueStatus; @@ -2968,6 +2969,7 @@ public class QueueImpl implements Queue { Iterator lastIterator = null; MessageReference cachedNext = null; + HashSet previouslyBrowsed = new HashSet(); private QueueBrowserIterator() { messagesIterator = new SynchronizedIterator(messageReferences.iterator()); @@ -3003,15 +3005,21 @@ public class QueueImpl implements Queue { while (true) { if (messagesIterator != null && messagesIterator.hasNext()) { MessageReference msg = messagesIterator.next(); + if (msg.isPaged()) { + previouslyBrowsed.add(((PagedReference)msg).getPosition()); + } return msg; } else { break; } } if (getPagingIterator() != null) { - if (getPagingIterator().hasNext()) { + while (getPagingIterator().hasNext()) { lastIterator = getPagingIterator(); - MessageReference ref = getPagingIterator().next(); + PagedReference ref = getPagingIterator().next(); + if (previouslyBrowsed.contains(ref.getPosition())) { + continue; + } return ref; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java index ac1163d87a..081dc79b05 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Assert; import org.junit.Test; @@ -111,9 +112,9 @@ public class JmsNettyNioStressTest extends ActiveMQTestBase { final int numberOfMessages = 100; // these must all be the same - final int numProducers = 30; - final int numConsumerProducers = 30; - final int numConsumers = 30; + final int numProducers = 5; + final int numConsumerProducers = 5; + final int numConsumers = 5; // each produce, consume+produce and consume increments this counter final AtomicInteger totalCount = new AtomicInteger(0); @@ -261,15 +262,8 @@ public class JmsNettyNioStressTest extends ActiveMQTestBase { }.start(); } - // check that the overall transaction count reaches the expected number, - // which would indicate that the system didn't stall - int timeoutCounter = 0; - int maxSecondsToWait = 60; - while (timeoutCounter < maxSecondsToWait && totalCount.get() < totalExpectedCount) { - timeoutCounter++; - Thread.sleep(1000); - System.out.println("Not done yet.. " + (maxSecondsToWait - timeoutCounter) + "; " + totalCount.get()); - } + Wait.waitFor(() -> totalExpectedCount == totalCount.get(), 60000, 100); + System.out.println("Done.." + totalCount.get() + ", expected " + totalExpectedCount); Assert.assertEquals("Possible deadlock", totalExpectedCount, totalCount.get()); System.out.println("After assert"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java index 5be1638a83..86b194e612 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/TemporaryQueueTest.java @@ -135,12 +135,13 @@ public class TemporaryQueueTest extends SingleServerTestBase { assertTrue(Arrays.asList(storeNames).contains(address)); consumer.close(); + session.deleteQueue(queue); + session.close(); storeNames = server.getPagingManager().getStoreNames(); assertFalse(Arrays.asList(storeNames).contains(address)); - session.close(); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index eb83031679..7d84e1f721 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -267,7 +267,6 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); - serverControl.createAddress(address.toString(), "ANYCAST"); serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, purgeOnNoConsumers, autoCreateAddress); checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); @@ -404,11 +403,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase { // management operations Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); - serverControl.createAddress(address.toString(), "ANYCAST"); - serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, true); Assert.assertTrue(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); - serverControl.destroyQueue(name.toString()); + serverControl.destroyQueue(name.toString(), true, true); Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); } @@ -1427,7 +1425,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { SimpleString address = new SimpleString("testQueue"); HashMap params = new HashMap<>(); params.put(TransportConstants.SERVER_ID_PROP_NAME, "2"); - Configuration config = createDefaultInVMConfig(2).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName(), params)); + Configuration config = createDefaultInVMConfig(2).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName(), params)).setSecurityEnabled(false); ActiveMQServer server2 = addServer(ActiveMQServers.newActiveMQServer(config, null, true)); this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java index a0fe205440..bffcd911d8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java @@ -186,9 +186,9 @@ public class PagingOrderTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, false, false); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); - Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); + Queue q1 = server.createQueue(ADDRESS, RoutingType.MULTICAST, ADDRESS, null, true, false); - Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false); + Queue q2 = server.createQueue(ADDRESS, RoutingType.MULTICAST, new SimpleString("inactive"), null, true, false); ClientProducer producer = session.createProducer(PagingTest.ADDRESS); @@ -316,9 +316,9 @@ public class PagingOrderTest extends ActiveMQTestBase { ClientSession session = sf.createSession(false, false, false); server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST)); - Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); + Queue q1 = server.createQueue(ADDRESS, RoutingType.MULTICAST, ADDRESS, null, true, false); - Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false); + Queue q2 = server.createQueue(ADDRESS, RoutingType.MULTICAST, new SimpleString("inactive"), null, true, false); ClientProducer producer = session.createProducer(PagingTest.ADDRESS);