This closes #1082
This commit is contained in:
commit
c1c0354d92
|
@ -109,9 +109,15 @@ public class ClusterController implements ActiveMQComponent {
|
||||||
//latch so we know once we are connected
|
//latch so we know once we are connected
|
||||||
replicationClusterConnectedLatch = new CountDownLatch(1);
|
replicationClusterConnectedLatch = new CountDownLatch(1);
|
||||||
//and add the quorum manager as a topology listener
|
//and add the quorum manager as a topology listener
|
||||||
defaultLocator.addClusterTopologyListener(quorumManager);
|
if (defaultLocator != null) {
|
||||||
//start the quorum manager
|
defaultLocator.addClusterTopologyListener(quorumManager);
|
||||||
quorumManager.start();
|
}
|
||||||
|
|
||||||
|
if (quorumManager != null) {
|
||||||
|
//start the quorum manager
|
||||||
|
quorumManager.start();
|
||||||
|
}
|
||||||
|
|
||||||
started = true;
|
started = true;
|
||||||
//connect all the locators in a separate thread
|
//connect all the locators in a separate thread
|
||||||
for (ServerLocatorInternal serverLocatorInternal : locators.values()) {
|
for (ServerLocatorInternal serverLocatorInternal : locators.values()) {
|
||||||
|
|
|
@ -247,9 +247,7 @@ public final class ClusterManager implements ActiveMQComponent {
|
||||||
/*
|
/*
|
||||||
* only start if we are actually in a cluster
|
* only start if we are actually in a cluster
|
||||||
* */
|
* */
|
||||||
if (clusterConnections.size() > 0) {
|
clusterController.start();
|
||||||
clusterController.start();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -2420,6 +2420,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
long txID = storageManager.generateID();
|
long txID = storageManager.generateID();
|
||||||
storageManager.deleteAddressBinding(txID, addressInfo.getId());
|
storageManager.deleteAddressBinding(txID, addressInfo.getId());
|
||||||
storageManager.commitBindings(txID);
|
storageManager.commitBindings(txID);
|
||||||
|
pagingManager.deletePageStore(address);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
||||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||||
import org.apache.activemq.artemis.core.filter.Filter;
|
import org.apache.activemq.artemis.core.filter.Filter;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
||||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||||
|
@ -2968,6 +2969,7 @@ public class QueueImpl implements Queue {
|
||||||
Iterator lastIterator = null;
|
Iterator lastIterator = null;
|
||||||
|
|
||||||
MessageReference cachedNext = null;
|
MessageReference cachedNext = null;
|
||||||
|
HashSet<PagePosition> previouslyBrowsed = new HashSet();
|
||||||
|
|
||||||
private QueueBrowserIterator() {
|
private QueueBrowserIterator() {
|
||||||
messagesIterator = new SynchronizedIterator(messageReferences.iterator());
|
messagesIterator = new SynchronizedIterator(messageReferences.iterator());
|
||||||
|
@ -3003,15 +3005,21 @@ public class QueueImpl implements Queue {
|
||||||
while (true) {
|
while (true) {
|
||||||
if (messagesIterator != null && messagesIterator.hasNext()) {
|
if (messagesIterator != null && messagesIterator.hasNext()) {
|
||||||
MessageReference msg = messagesIterator.next();
|
MessageReference msg = messagesIterator.next();
|
||||||
|
if (msg.isPaged()) {
|
||||||
|
previouslyBrowsed.add(((PagedReference)msg).getPosition());
|
||||||
|
}
|
||||||
return msg;
|
return msg;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (getPagingIterator() != null) {
|
if (getPagingIterator() != null) {
|
||||||
if (getPagingIterator().hasNext()) {
|
while (getPagingIterator().hasNext()) {
|
||||||
lastIterator = getPagingIterator();
|
lastIterator = getPagingIterator();
|
||||||
MessageReference ref = getPagingIterator().next();
|
PagedReference ref = getPagingIterator().next();
|
||||||
|
if (previouslyBrowsed.contains(ref.getPosition())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
return ref;
|
return ref;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -111,9 +112,9 @@ public class JmsNettyNioStressTest extends ActiveMQTestBase {
|
||||||
final int numberOfMessages = 100;
|
final int numberOfMessages = 100;
|
||||||
|
|
||||||
// these must all be the same
|
// these must all be the same
|
||||||
final int numProducers = 30;
|
final int numProducers = 5;
|
||||||
final int numConsumerProducers = 30;
|
final int numConsumerProducers = 5;
|
||||||
final int numConsumers = 30;
|
final int numConsumers = 5;
|
||||||
|
|
||||||
// each produce, consume+produce and consume increments this counter
|
// each produce, consume+produce and consume increments this counter
|
||||||
final AtomicInteger totalCount = new AtomicInteger(0);
|
final AtomicInteger totalCount = new AtomicInteger(0);
|
||||||
|
@ -261,15 +262,8 @@ public class JmsNettyNioStressTest extends ActiveMQTestBase {
|
||||||
}.start();
|
}.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
// check that the overall transaction count reaches the expected number,
|
Wait.waitFor(() -> totalExpectedCount == totalCount.get(), 60000, 100);
|
||||||
// which would indicate that the system didn't stall
|
|
||||||
int timeoutCounter = 0;
|
|
||||||
int maxSecondsToWait = 60;
|
|
||||||
while (timeoutCounter < maxSecondsToWait && totalCount.get() < totalExpectedCount) {
|
|
||||||
timeoutCounter++;
|
|
||||||
Thread.sleep(1000);
|
|
||||||
System.out.println("Not done yet.. " + (maxSecondsToWait - timeoutCounter) + "; " + totalCount.get());
|
|
||||||
}
|
|
||||||
System.out.println("Done.." + totalCount.get() + ", expected " + totalExpectedCount);
|
System.out.println("Done.." + totalCount.get() + ", expected " + totalExpectedCount);
|
||||||
Assert.assertEquals("Possible deadlock", totalExpectedCount, totalCount.get());
|
Assert.assertEquals("Possible deadlock", totalExpectedCount, totalCount.get());
|
||||||
System.out.println("After assert");
|
System.out.println("After assert");
|
||||||
|
|
|
@ -135,12 +135,13 @@ public class TemporaryQueueTest extends SingleServerTestBase {
|
||||||
assertTrue(Arrays.asList(storeNames).contains(address));
|
assertTrue(Arrays.asList(storeNames).contains(address));
|
||||||
|
|
||||||
consumer.close();
|
consumer.close();
|
||||||
|
|
||||||
session.deleteQueue(queue);
|
session.deleteQueue(queue);
|
||||||
|
session.close();
|
||||||
|
|
||||||
storeNames = server.getPagingManager().getStoreNames();
|
storeNames = server.getPagingManager().getStoreNames();
|
||||||
assertFalse(Arrays.asList(storeNames).contains(address));
|
assertFalse(Arrays.asList(storeNames).contains(address));
|
||||||
|
|
||||||
session.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -267,7 +267,6 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
ActiveMQServerControl serverControl = createManagementControl();
|
ActiveMQServerControl serverControl = createManagementControl();
|
||||||
|
|
||||||
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
|
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
|
||||||
serverControl.createAddress(address.toString(), "ANYCAST");
|
|
||||||
serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, purgeOnNoConsumers, autoCreateAddress);
|
serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, purgeOnNoConsumers, autoCreateAddress);
|
||||||
|
|
||||||
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
|
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
|
||||||
|
@ -404,11 +403,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
// management operations
|
// management operations
|
||||||
|
|
||||||
Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
|
Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
|
||||||
serverControl.createAddress(address.toString(), "ANYCAST");
|
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, true);
|
||||||
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false);
|
|
||||||
Assert.assertTrue(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
|
Assert.assertTrue(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
|
||||||
|
|
||||||
serverControl.destroyQueue(name.toString());
|
serverControl.destroyQueue(name.toString(), true, true);
|
||||||
Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
|
Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1427,7 +1425,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
SimpleString address = new SimpleString("testQueue");
|
SimpleString address = new SimpleString("testQueue");
|
||||||
HashMap<String, Object> params = new HashMap<>();
|
HashMap<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.SERVER_ID_PROP_NAME, "2");
|
params.put(TransportConstants.SERVER_ID_PROP_NAME, "2");
|
||||||
Configuration config = createDefaultInVMConfig(2).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName(), params));
|
Configuration config = createDefaultInVMConfig(2).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName(), params)).setSecurityEnabled(false);
|
||||||
ActiveMQServer server2 = addServer(ActiveMQServers.newActiveMQServer(config, null, true));
|
ActiveMQServer server2 = addServer(ActiveMQServers.newActiveMQServer(config, null, true));
|
||||||
|
|
||||||
this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params));
|
this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params));
|
||||||
|
|
|
@ -186,9 +186,9 @@ public class PagingOrderTest extends ActiveMQTestBase {
|
||||||
ClientSession session = sf.createSession(false, false, false);
|
ClientSession session = sf.createSession(false, false, false);
|
||||||
|
|
||||||
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||||
Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
Queue q1 = server.createQueue(ADDRESS, RoutingType.MULTICAST, ADDRESS, null, true, false);
|
||||||
|
|
||||||
Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false);
|
Queue q2 = server.createQueue(ADDRESS, RoutingType.MULTICAST, new SimpleString("inactive"), null, true, false);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||||
|
|
||||||
|
@ -316,9 +316,9 @@ public class PagingOrderTest extends ActiveMQTestBase {
|
||||||
ClientSession session = sf.createSession(false, false, false);
|
ClientSession session = sf.createSession(false, false, false);
|
||||||
|
|
||||||
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||||
Queue q1 = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
Queue q1 = server.createQueue(ADDRESS, RoutingType.MULTICAST, ADDRESS, null, true, false);
|
||||||
|
|
||||||
Queue q2 = server.createQueue(ADDRESS, RoutingType.ANYCAST, new SimpleString("inactive"), null, true, false);
|
Queue q2 = server.createQueue(ADDRESS, RoutingType.MULTICAST, new SimpleString("inactive"), null, true, false);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue