mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2789 and https://issues.apache.org/activemq/browse/AMQ-2843 - adds tests for cursors and stores and fixes a PrioritizedPendingList bug
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@966223 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
03f38ae402
commit
c4425caa2f
|
@ -506,6 +506,9 @@
|
||||||
<!-- breaks hudson: disable till we get a chance to give it the time that it needs - http://hudson.zones.apache.org/hudson/job/ActiveMQ/org.apache.activemq$activemq-core/199/testReport/org.apache.activemq.network/BrokerNetworkWithStuckMessagesTest/testBrokerNetworkWithStuckMessages/ -->
|
<!-- breaks hudson: disable till we get a chance to give it the time that it needs - http://hudson.zones.apache.org/hudson/job/ActiveMQ/org.apache.activemq$activemq-core/199/testReport/org.apache.activemq.network/BrokerNetworkWithStuckMessagesTest/testBrokerNetworkWithStuckMessages/ -->
|
||||||
<exclude>**/BrokerNetworkWithStuckMessagesTest.*</exclude>
|
<exclude>**/BrokerNetworkWithStuckMessagesTest.*</exclude>
|
||||||
|
|
||||||
|
<!-- exclude until implemented -->
|
||||||
|
<exclude>**/JDBCMessagePriorityTest.*</exclude>
|
||||||
|
|
||||||
</excludes>
|
</excludes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
|
@ -82,7 +82,7 @@ public class PrioritizedPendingList implements PendingList {
|
||||||
protected int getPriority(MessageReference message) {
|
protected int getPriority(MessageReference message) {
|
||||||
int priority = javax.jms.Message.DEFAULT_PRIORITY;
|
int priority = javax.jms.Message.DEFAULT_PRIORITY;
|
||||||
if (message.getMessageId() != null) {
|
if (message.getMessageId() != null) {
|
||||||
Math.max(message.getMessage().getPriority(), 0);
|
priority = Math.max(message.getMessage().getPriority(), 0);
|
||||||
priority = Math.min(priority, 9);
|
priority = Math.min(priority, 9);
|
||||||
}
|
}
|
||||||
return priority;
|
return priority;
|
||||||
|
|
|
@ -18,19 +18,28 @@
|
||||||
package org.apache.activemq.store;
|
package org.apache.activemq.store;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
|
import javax.jms.TopicSubscriber;
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.CombinationTestSupport;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
abstract public class MessagePriorityTest extends TestCase {
|
abstract public class MessagePriorityTest extends CombinationTestSupport {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(MessagePriorityTest.class);
|
||||||
|
|
||||||
BrokerService broker;
|
BrokerService broker;
|
||||||
PersistenceAdapter adapter;
|
PersistenceAdapter adapter;
|
||||||
|
@ -39,6 +48,12 @@ abstract public class MessagePriorityTest extends TestCase {
|
||||||
Connection conn;
|
Connection conn;
|
||||||
Session sess;
|
Session sess;
|
||||||
|
|
||||||
|
public boolean useCache;
|
||||||
|
|
||||||
|
int MSG_NUM = 1000;
|
||||||
|
int HIGH_PRI = 7;
|
||||||
|
int LOW_PRI = 3;
|
||||||
|
|
||||||
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
|
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
|
||||||
|
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
|
@ -48,6 +63,7 @@ abstract public class MessagePriorityTest extends TestCase {
|
||||||
broker.setPersistenceAdapter(adapter);
|
broker.setPersistenceAdapter(adapter);
|
||||||
PolicyEntry policy = new PolicyEntry();
|
PolicyEntry policy = new PolicyEntry();
|
||||||
policy.setPrioritizedMessages(true);
|
policy.setPrioritizedMessages(true);
|
||||||
|
policy.setUseCache(useCache);
|
||||||
PolicyMap policyMap = new PolicyMap();
|
PolicyMap policyMap = new PolicyMap();
|
||||||
policyMap.setDefaultEntry(policy);
|
policyMap.setDefaultEntry(policy);
|
||||||
broker.setDestinationPolicy(policyMap);
|
broker.setDestinationPolicy(policyMap);
|
||||||
|
@ -56,6 +72,8 @@ abstract public class MessagePriorityTest extends TestCase {
|
||||||
|
|
||||||
factory = new ActiveMQConnectionFactory("vm://priorityTest");
|
factory = new ActiveMQConnectionFactory("vm://priorityTest");
|
||||||
conn = factory.createConnection();
|
conn = factory.createConnection();
|
||||||
|
conn.setClientID("priority");
|
||||||
|
conn.start();
|
||||||
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,7 +93,7 @@ abstract public class MessagePriorityTest extends TestCase {
|
||||||
MessageProducer topicProducer = sess.createProducer(topic);
|
MessageProducer topicProducer = sess.createProducer(topic);
|
||||||
|
|
||||||
|
|
||||||
Thread.sleep(100); // get it all propagated
|
Thread.sleep(500); // get it all propagated
|
||||||
|
|
||||||
assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages());
|
assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages());
|
||||||
assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages());
|
assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages());
|
||||||
|
@ -85,4 +103,84 @@ abstract public class MessagePriorityTest extends TestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class ProducerThread extends Thread {
|
||||||
|
|
||||||
|
int priority;
|
||||||
|
int messageCount;
|
||||||
|
ActiveMQDestination dest;
|
||||||
|
|
||||||
|
public ProducerThread(ActiveMQDestination dest, int messageCount, int priority) {
|
||||||
|
this.messageCount = messageCount;
|
||||||
|
this.priority = priority;
|
||||||
|
this.dest = dest;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
MessageProducer producer = sess.createProducer(dest);
|
||||||
|
producer.setPriority(priority);
|
||||||
|
for (int i = 0; i < messageCount; i++) {
|
||||||
|
producer.send(sess.createTextMessage("message priority: " + priority));
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initCombosForTestQueues() {
|
||||||
|
addCombinationValues("useCache", new Object[] {new Boolean(true), new Boolean(false)});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testQueues() throws Exception {
|
||||||
|
ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST");
|
||||||
|
|
||||||
|
ProducerThread lowPri = new ProducerThread(queue, MSG_NUM, LOW_PRI);
|
||||||
|
ProducerThread highPri = new ProducerThread(queue, MSG_NUM, HIGH_PRI);
|
||||||
|
|
||||||
|
lowPri.start();
|
||||||
|
highPri.start();
|
||||||
|
|
||||||
|
lowPri.join();
|
||||||
|
highPri.join();
|
||||||
|
|
||||||
|
MessageConsumer queueConsumer = sess.createConsumer(queue);
|
||||||
|
for (int i = 0; i < MSG_NUM * 2; i++) {
|
||||||
|
Message msg = queueConsumer.receive(1000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Message createMessage(int priority) throws Exception {
|
||||||
|
final String text = "Message with priority " + priority;
|
||||||
|
Message msg = sess.createTextMessage(text);
|
||||||
|
LOG.info("Sending " + text);
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDurableSubs() throws Exception {
|
||||||
|
ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
|
||||||
|
TopicSubscriber sub = sess.createDurableSubscriber(topic, "priority");
|
||||||
|
sub.close();
|
||||||
|
|
||||||
|
ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
|
||||||
|
ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
|
||||||
|
|
||||||
|
lowPri.start();
|
||||||
|
highPri.start();
|
||||||
|
|
||||||
|
lowPri.join();
|
||||||
|
highPri.join();
|
||||||
|
|
||||||
|
sub = sess.createDurableSubscriber(topic, "priority");
|
||||||
|
for (int i = 0; i < MSG_NUM * 2; i++) {
|
||||||
|
Message msg = sub.receive(1000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
|
|
||||||
package org.apache.activemq.store.jdbc;
|
package org.apache.activemq.store.jdbc;
|
||||||
|
|
||||||
|
import junit.framework.Test;
|
||||||
|
|
||||||
import org.apache.activemq.store.MessagePriorityTest;
|
import org.apache.activemq.store.MessagePriorityTest;
|
||||||
import org.apache.activemq.store.PersistenceAdapter;
|
import org.apache.activemq.store.PersistenceAdapter;
|
||||||
import org.apache.derby.jdbc.EmbeddedDataSource;
|
import org.apache.derby.jdbc.EmbeddedDataSource;
|
||||||
|
@ -30,7 +32,12 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
|
||||||
dataSource.setDatabaseName("derbyDb");
|
dataSource.setDatabaseName("derbyDb");
|
||||||
dataSource.setCreateDatabase("create");
|
dataSource.setCreateDatabase("create");
|
||||||
jdbc.setDataSource(dataSource);
|
jdbc.setDataSource(dataSource);
|
||||||
|
jdbc.deleteAllMessages();
|
||||||
return jdbc;
|
return jdbc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Test suite() {
|
||||||
|
return suite(JDBCMessagePriorityTest.class);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue