diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java index 3c79fcff79..59f7faac8f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java @@ -14,12 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.bugs; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.net.ServerSocket; +import static org.junit.Assert.fail; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -31,59 +29,34 @@ import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQTopicSubscriber; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQTopic; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.fail; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQ3678Test implements MessageListener { - public int deliveryMode = DeliveryMode.NON_PERSISTENT; - + private static Logger LOG = LoggerFactory.getLogger(AMQ3678Test.class); private BrokerService broker; + private String connectionURI; - AtomicInteger messagesSent = new AtomicInteger(0); - AtomicInteger messagesReceived = new AtomicInteger(0); + private final AtomicInteger messagesSent = new AtomicInteger(0); + private final AtomicInteger messagesReceived = new AtomicInteger(0); + private final ActiveMQTopic destination = new ActiveMQTopic("XYZ"); + private final CountDownLatch latch = new CountDownLatch(2); + private final int deliveryMode = DeliveryMode.NON_PERSISTENT; - ActiveMQTopic destination = new ActiveMQTopic("XYZ"); - - int port; - int jmxport; - - - final CountDownLatch latch = new CountDownLatch(2); - - - public static void main(String[] args) throws Exception { - - } - - - public static int findFreePort() throws IOException { - ServerSocket socket = null; - - try { - // 0 is open a socket on any free port - socket = new ServerSocket(0); - return socket.getLocalPort(); - } finally { - if (socket != null) { - socket.close(); - } - } - } - - - @Test + @Test(timeout = 60000) public void countConsumers() throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + port); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI); factory.setAlwaysSyncSend(true); factory.setDispatchAsync(false); @@ -95,13 +68,13 @@ public class AMQ3678Test implements MessageListener { consumerConnection.setClientID("subscriber1"); Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - ActiveMQTopicSubscriber activeConsumer = (ActiveMQTopicSubscriber) consumerMQSession.createDurableSubscriber(destination, "myTopic?consumer.prefetchSize=1"); + ActiveMQTopicSubscriber activeConsumer = (ActiveMQTopicSubscriber) consumerMQSession.createDurableSubscriber(destination, + "myTopic?consumer.prefetchSize=1"); activeConsumer.setMessageListener(this); consumerConnection.start(); - final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); final MessageProducer producer = producerSession.createProducer(destination); producer.setDeliveryMode(deliveryMode); @@ -110,17 +83,14 @@ public class AMQ3678Test implements MessageListener { private boolean done = false; + @Override public void run() { while (!done) { if (messagesSent.get() == 50) { try { broker.getAdminView().removeTopic(destination.getTopicName()); } catch (Exception e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - System.err.flush(); - fail("Unable to remove destination:" - + destination.getPhysicalName()); + fail("Unable to remove destination:" + destination.getPhysicalName()); } } @@ -128,18 +98,15 @@ public class AMQ3678Test implements MessageListener { producer.send(producerSession.createTextMessage()); int val = messagesSent.incrementAndGet(); - System.out.println("sent message (" + val + ")"); - System.out.flush(); + LOG.trace("sent message (" + val + ")"); if (val == 100) { done = true; latch.countDown(); producer.close(); producerSession.close(); - } } catch (JMSException e) { - // TODO Auto-generated catch block e.printStackTrace(); } } @@ -153,68 +120,42 @@ public class AMQ3678Test implements MessageListener { fail("did not receive all the messages"); } } catch (InterruptedException e) { - // TODO Auto-generated catch block fail("did not receive all the messages, exception waiting for latch"); - e.printStackTrace(); } - - -// - - } @Before public void setUp() throws Exception { - - try { - port = findFreePort(); - jmxport = findFreePort(); - } catch (Exception e) { - fail("Unable to obtain a free port on which to start the broker"); - } - - System.out.println("Starting broker"); - System.out.flush(); broker = new BrokerService(); broker.setPersistent(false); - ManagementContext ctx = new ManagementContext(ManagementFactory.getPlatformMBeanServer()); - ctx.setConnectorPort(jmxport); - broker.setManagementContext(ctx); broker.setUseJmx(true); -// broker.setAdvisorySupport(false); -// broker.setDeleteAllMessagesOnStartup(true); + broker.getManagementContext().setCreateConnector(false); + broker.setAdvisorySupport(false); - broker.addConnector("tcp://localhost:" + port).setName("Default"); + TransportConnector connector = broker.addConnector("tcp://localhost:0"); broker.start(); + broker.waitUntilStarted(); - - System.out.println("End of Broker Setup"); - System.out.flush(); + connectionURI = connector.getPublishableConnectString(); } @After public void tearDown() throws Exception { broker.stop(); + broker.waitUntilStopped(); } - @Override public void onMessage(Message message) { try { message.acknowledge(); int val = messagesReceived.incrementAndGet(); - System.out.println("received message (" + val + ")"); - System.out.flush(); + LOG.trace("received message (" + val + ")"); if (messagesReceived.get() == 100) { latch.countDown(); } } catch (JMSException e) { - // TODO Auto-generated catch block e.printStackTrace(); } - } - - }