Fix org.apache.activemq.JmsQueueCompositeSendReceiveTest#testDuplicate

This test uses activemq5 native API to examine queue message count
    The fix uses core's QueueQuery API to do the job instead.
This commit is contained in:
Howard Gao 2015-08-05 21:00:33 +08:00 committed by Clebert Suconic
parent 72b7f82712
commit 896813fc1c
3 changed files with 27 additions and 19 deletions

View File

@ -84,6 +84,12 @@
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-leveldb-store</artifactId> <artifactId>activemq-leveldb-store</artifactId>
<version>${activemq5.project.version}</version> <version>${activemq5.project.version}</version>
<exclusions>
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -62,7 +62,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
server.getConfiguration().getAcceptorConfigurations().clear(); server.getConfiguration().getAcceptorConfigurations().clear();
HashMap<String, Object> params = new HashMap<String, Object>(); HashMap<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.PORT_PROP_NAME, "61616"); params.put(TransportConstants.PORT_PROP_NAME, "61616");
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE"); params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE,CORE");
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
Configuration serverConfig = server.getConfiguration(); Configuration serverConfig = server.getConfiguration();

View File

@ -20,16 +20,17 @@ import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic; import javax.jms.Topic;
import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.broker.region.Queue; import org.apache.activemq.artemis.jms.client.ActiveMQSession;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.artemis.uri.ConnectionFactoryParser;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.test.JmsTopicSendReceiveTest; import org.apache.activemq.test.JmsTopicSendReceiveTest;
import org.apache.activemq.util.Wait;
import java.net.URI;
/** /**
@ -94,7 +95,7 @@ public class JmsQueueCompositeSendReceiveTest extends JmsTopicSendReceiveTest {
} }
public void testDuplicate() throws Exception { public void testDuplicate() throws Exception {
ActiveMQDestination queue = (ActiveMQDestination)session.createQueue("TEST,TEST"); ActiveMQDestination queue = (ActiveMQDestination)session.createQueue("TEST,TEST");
for (int i = 0; i < data.length; i++) { for (int i = 0; i < data.length; i++) {
Message message = createMessage(i); Message message = createMessage(i);
configureMessage(message); configureMessage(message);
@ -104,16 +105,17 @@ public class JmsQueueCompositeSendReceiveTest extends JmsTopicSendReceiveTest {
producer.send(queue, message); producer.send(queue, message);
} }
Thread.sleep(200); // wait for messages to be queued Thread.sleep(200); // wait for messages to be queue;
BrokerService broker = BrokerRegistry.getInstance().lookup("localhost"); org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory factory = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?type=CF");
final Queue dest = (Queue)((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(new ActiveMQQueue("TEST")); org.apache.activemq.artemis.jms.client.ActiveMQConnection conn = (org.apache.activemq.artemis.jms.client.ActiveMQConnection) factory.createConnection();
assertTrue("all messages were received", Wait.waitFor(new Wait.Condition(){ try {
public boolean isSatisified() throws Exception { ActiveMQSession session = (ActiveMQSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
return data.length == dest.getDestinationStatistics().getMessages().getCount(); ClientSession.QueueQuery query = session.getCoreSession().queueQuery(new SimpleString("jms.queue.TEST"));
}})); assertNotNull(query);
assertEquals(data.length, query.getMessageCount());
dest.purge(); } finally {
assertEquals(0, dest.getDestinationStatistics().getMessages().getCount()); conn.close();
}
} }
} }