diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml index 12e6d9519b..e4f1b130e2 100755 --- a/activemq-mqtt/pom.xml +++ b/activemq-mqtt/pom.xml @@ -207,7 +207,7 @@ ${surefire.argLine} alphabetical - false + org.apache.activemq.default.directory.prefix diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLevelDBTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLevelDBTest.java new file mode 100644 index 0000000000..e76e66ae07 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLevelDBTest.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import static org.junit.Assert.assertEquals; + +import java.net.URI; +import java.util.Enumeration; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.QueueBrowser; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QueueBrowsingLevelDBTest { + + private static final Logger LOG = LoggerFactory.getLogger(QueueBrowsingLevelDBTest.class); + + private BrokerService broker; + private URI connectUri; + private ActiveMQConnectionFactory factory; + + + @Before + public void startBroker() throws Exception { + createBroker(); + TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0"); + broker.deleteAllMessages(); + broker.start(); + broker.waitUntilStarted(); + connectUri = connector.getConnectUri(); + factory = new ActiveMQConnectionFactory(connectUri); + } + + private void createBroker() { + broker = new BrokerService(); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test + public void testBrowsing() throws JMSException { + + int messageToSend = 370; + + ActiveMQQueue queue = new ActiveMQQueue("TEST"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + + String data = ""; + for( int i=0; i < 1024*2; i++ ) { + data += "x"; + } + + for( int i=0; i < messageToSend; i++ ) { + producer.send(session.createTextMessage(data)); + } + + QueueBrowser browser = session.createBrowser(queue); + Enumeration enumeration = browser.getEnumeration(); + int received = 0; + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + received++; + LOG.info("Browsed message " + received + ": " + m.getJMSMessageID()); + } + + browser.close(); + + assertEquals(messageToSend, received); + } + + @Test + public void testBrowseConcurrent() throws Exception { + final int messageToSend = 370; + + final ActiveMQQueue queue = new ActiveMQQueue("TEST"); + Connection connection = factory.createConnection(); + connection.start(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + + String data = ""; + for( int i=0; i < 1024*2; i++ ) { + data += "x"; + } + + for( int i=0; i < messageToSend; i++ ) { + producer.send(session.createTextMessage(data)); + } + + Thread browserThread = new Thread() { + @Override + public void run() { + try { + QueueBrowser browser = session.createBrowser(queue); + Enumeration enumeration = browser.getEnumeration(); + int received = 0; + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + received++; + LOG.info("Browsed message " + received + ": " + m.getJMSMessageID()); + } + assertEquals("Browsed all messages", messageToSend, received); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + browserThread.start(); + + Thread consumerThread = new Thread() { + @Override + public void run() { + try { + MessageConsumer consumer = session.createConsumer(queue); + int received = 0; + while (true) { + Message m = consumer.receive(1000); + if (m == null) + break; + received++; + } + assertEquals("Consumed all messages", messageToSend, received); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + consumerThread.start(); + + browserThread.join(); + consumerThread.join(); + } + + @Test + public void testMemoryLimit() throws Exception { + broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024); + + int messageToSend = 370; + + ActiveMQQueue queue = new ActiveMQQueue("TEST"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + + String data = ""; + for( int i=0; i < 1024*2; i++ ) { + data += "x"; + } + + for( int i=0; i < messageToSend; i++ ) { + producer.send(session.createTextMessage(data)); + } + + QueueBrowser browser = session.createBrowser(queue); + Enumeration enumeration = browser.getEnumeration(); + int received = 0; + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + received++; + LOG.info("Browsed message " + received + ": " + m.getJMSMessageID()); + } + + browser.close(); + assertEquals(3, received); + } +}