NO JIRA - add some durable sub loadbalancing to the scenario test
This commit is contained in:
parent
0c5921eef6
commit
69e58322f1
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.artemis.tests.integration.jms.cluster;
|
package org.apache.activemq.artemis.tests.integration.jms.cluster;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
|
@ -58,9 +59,9 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
|
||||||
|
|
||||||
Topic topic1 = createTopic(TOPIC, true);
|
Topic topic1 = createTopic(TOPIC, true);
|
||||||
|
|
||||||
Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
|
||||||
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
|
||||||
MessageProducer prod1 = session1.createProducer(null);
|
MessageProducer prod1 = session1.createProducer(null);
|
||||||
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
|
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
@ -75,9 +76,11 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
|
||||||
|
|
||||||
if (forcePaging) {
|
if (forcePaging) {
|
||||||
for (SimpleString psName : server1.getPagingManager().getStoreNames()) {
|
for (SimpleString psName : server1.getPagingManager().getStoreNames()) {
|
||||||
|
System.err.println("server1: force paging on:" + psName);
|
||||||
server1.getPagingManager().getPageStore(psName).startPaging();
|
server1.getPagingManager().getPageStore(psName).startPaging();
|
||||||
}
|
}
|
||||||
for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
|
for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
|
||||||
|
System.err.println("server2: force paging on:" + psName);
|
||||||
server2.getPagingManager().getPageStore(psName).startPaging();
|
server2.getPagingManager().getPageStore(psName).startPaging();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,10 +89,17 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
|
||||||
|
|
||||||
TextMessage m2 = (TextMessage) cons2.receive(5000);
|
TextMessage m2 = (TextMessage) cons2.receive(5000);
|
||||||
assertNotNull(m2);
|
assertNotNull(m2);
|
||||||
TextMessage m1 = (TextMessage) cons1.receive(5000);
|
assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
|
||||||
assertTrue(m1.getJMSDestination().toString().contains(TOPIC));
|
System.err.println("sub2 on 2, no ack, message txt:" + m2.getText());
|
||||||
|
|
||||||
|
|
||||||
|
TextMessage m1 = (TextMessage) cons1.receive(5000);
|
||||||
assertNotNull(m1);
|
assertNotNull(m1);
|
||||||
|
assertTrue(m1.getJMSDestination().toString().contains(TOPIC));
|
||||||
|
System.err.println("message txt:" + m1.getText());
|
||||||
|
|
||||||
|
m1.acknowledge();
|
||||||
|
// leave m2 for reconnect on server1
|
||||||
|
|
||||||
conn1.close();
|
conn1.close();
|
||||||
conn2.close();
|
conn2.close();
|
||||||
|
@ -99,7 +109,103 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
|
for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
|
||||||
|
System.err.println("server2: size of pages store: " + psName + " :" + server2.getPagingManager().getPageStore(psName).getAddressSize());
|
||||||
assertTrue("non negative size: " + psName, server2.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
|
assertTrue("non negative size: " + psName, server2.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (forcePaging) {
|
||||||
|
// message in the store, should have getPagedSize or is there some such thing?
|
||||||
|
assertTrue("size on 2", server2.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getNumberOfPages() > 0);
|
||||||
|
} else {
|
||||||
|
assertTrue("size on 2", server2.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getAddressSize() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// reconnect
|
||||||
|
// get message for someClient2 on server 1 (cf1)
|
||||||
|
conn1 = cf1.createConnection();
|
||||||
|
|
||||||
|
conn1.setClientID("someClient2");
|
||||||
|
|
||||||
|
conn1.start();
|
||||||
|
|
||||||
|
session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
cons1 = session1.createDurableSubscriber(topic1, "sub2");
|
||||||
|
|
||||||
|
m2 = (TextMessage) cons1.receive(5000);
|
||||||
|
assertNotNull(m2);
|
||||||
|
assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
|
||||||
|
System.err.println("sub2 on 1, message txt:" + m2.getText());
|
||||||
|
|
||||||
|
m2.acknowledge();
|
||||||
|
|
||||||
|
// publish another
|
||||||
|
prod1 = session1.createProducer(null);
|
||||||
|
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
|
||||||
|
prod1.send(topic1, session1.createTextMessage("someOtherMessage"));
|
||||||
|
|
||||||
|
// stays on server 1
|
||||||
|
assertTrue("some size on 1", server1.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getAddressSize() > 0);
|
||||||
|
assertTrue("no size on 2", server2.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getAddressSize() == 0);
|
||||||
|
|
||||||
|
// duplicate this sub on 2
|
||||||
|
conn2 = cf2.createConnection();
|
||||||
|
|
||||||
|
conn2.setClientID("someClient2");
|
||||||
|
|
||||||
|
conn2.start();
|
||||||
|
|
||||||
|
session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
// the clientId unique guarantee is per broker, not per cluster
|
||||||
|
cons2 = session2.createDurableSubscriber(topic1, "sub2");
|
||||||
|
|
||||||
|
// should not be able to consume yet
|
||||||
|
m2 = (TextMessage) cons2.receiveNoWait();
|
||||||
|
assertNull("did not get message", m2);
|
||||||
|
|
||||||
|
// still available on cons1
|
||||||
|
m2 = (TextMessage) cons1.receive(5000);
|
||||||
|
assertNotNull("got message", m2);
|
||||||
|
assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
|
||||||
|
System.err.println("sub2 on 1, message txt:" + m2.getText());
|
||||||
|
m2.acknowledge();
|
||||||
|
|
||||||
|
// if we send another, lb will kick in..
|
||||||
|
prod1.send(topic1, session1.createTextMessage("someOtherOtherMessage"));
|
||||||
|
|
||||||
|
m2 = (TextMessage) cons2.receive(5000);
|
||||||
|
assertNotNull("got message", m2);
|
||||||
|
assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
|
||||||
|
|
||||||
|
System.err.println("sub2 on 2: message txt:" + m2.getText());
|
||||||
|
m2.acknowledge();
|
||||||
|
|
||||||
|
// no duplicate, not available on cons1
|
||||||
|
m2 = (TextMessage) cons1.receiveNoWait();
|
||||||
|
assertNull("non null message", m2);
|
||||||
|
|
||||||
|
conn1.close();
|
||||||
|
conn2.close();
|
||||||
|
|
||||||
|
// pick up sub1:someClient1 messages, one from each broker
|
||||||
|
for (ConnectionFactory cf : new ConnectionFactory[]{cf2, cf1}) {
|
||||||
|
conn2 = cf.createConnection();
|
||||||
|
conn2.setClientID("someClient1");
|
||||||
|
conn2.start();
|
||||||
|
|
||||||
|
session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
cons2 = session2.createDurableSubscriber(topic1, "sub1");
|
||||||
|
|
||||||
|
m2 = (TextMessage) cons2.receive(5000);
|
||||||
|
assertNotNull("got message", m2);
|
||||||
|
assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
|
||||||
|
System.err.println("sub1 on: " + cf + " - message txt:" + m2.getText());
|
||||||
|
m2.acknowledge();
|
||||||
|
|
||||||
|
conn2.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue