From 21c3ba35824ce09aa69ed785397d041a98f8d18f Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 9 Jul 2015 13:52:30 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5621 Clean up some warning, remove System.out calls, remove references to static ports. --- .../org/apache/activemq/bugs/AMQ2512Test.java | 82 ++++++++++++------- .../org/apache/activemq/bugs/AMQ5266Test.java | 43 +++------- 2 files changed, 66 insertions(+), 59 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java index 669066e1ee..a29054969c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; @@ -33,31 +34,41 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; + import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class AMQ2512Test extends EmbeddedBrokerTestSupport { - private static Connection connection; - private final static String QUEUE_NAME = "dee.q"; - private final static int INITIAL_MESSAGES_CNT = 1000; - private final static int WORKER_INTERNAL_ITERATIONS = 100; - private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS - + INITIAL_MESSAGES_CNT; - private final static byte[] payload = new byte[5 * 1024]; - private final static String TEXT = new String(payload); +public class AMQ2512Test { - private final static String PRP_INITIAL_ID = "initial-id"; - private final static String PRP_WORKER_ID = "worker-id"; + private static final Logger LOG = LoggerFactory.getLogger(AMQ2512Test.class); - private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT); + private final String QUEUE_NAME = "dee.q"; + private final int INITIAL_MESSAGES_CNT = 1000; + private final int WORKER_INTERNAL_ITERATIONS = 100; + private final int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS + INITIAL_MESSAGES_CNT; + private final byte[] payload = new byte[5 * 1024]; + private final String TEXT = new String(payload); - private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger(); + private final String PRP_INITIAL_ID = "initial-id"; + private final String PRP_WORKER_ID = "worker-id"; + private final CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT); + private final AtomicInteger ON_MSG_COUNTER = new AtomicInteger(); + + private BrokerService brokerService; + private Connection connection; + private String connectionURI; + + @Test(timeout = 60000) public void testKahaDBFailure() throws Exception { - final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress); + final ConnectionFactory fac = new ActiveMQConnectionFactory(connectionURI); connection = fac.createConnection(); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue queue = session.createQueue(QUEUE_NAME); @@ -80,18 +91,20 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport { LATCH.await(); final long endTime = System.nanoTime(); - System.out.println("Total execution time = " + LOG.info("Total execution time = " + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms]."); - System.out.println("Rate = " + TOTAL_MESSAGES_CNT + LOG.info("Rate = " + TOTAL_MESSAGES_CNT / TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s]."); for (Consumer c : consumers) { c.close(); } + connection.close(); } - private final static class Consumer implements MessageListener { + private final class Consumer implements MessageListener { + private final String name; private final Session session; private final MessageProducer producer; @@ -111,6 +124,7 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport { } } + @Override public void onMessage(Message message) { final TextMessage msg = (TextMessage) message; try { @@ -130,7 +144,7 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport { } finally { final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement(); if (onMsgCounter % 1000 == 0) { - System.out.println("message received: " + onMsgCounter); + LOG.info("message received: " + onMsgCounter); } LATCH.countDown(); } @@ -148,27 +162,37 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport { } } - @Override - protected void setUp() throws Exception { - bindAddress = "tcp://0.0.0.0:61617"; - super.setUp(); + @Before + public void setUp() throws Exception { + brokerService = createBroker(); + brokerService.start(); + + connectionURI = brokerService.getTransportConnectorByName("openwire").getPublishableConnectString(); + } + + @After + public void tearDown() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + } } - @Override protected BrokerService createBroker() throws Exception { File dataFileDir = new File("target/test-amq-2512/datadb"); IOHelper.mkdirs(dataFileDir); IOHelper.deleteChildren(dataFileDir); + KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(dataFileDir); + kaha.setDirectory(dataFileDir); + kaha.setEnableJournalDiskSyncs(false); + BrokerService answer = new BrokerService(); answer.setPersistenceAdapter(kaha); - - kaha.setEnableJournalDiskSyncs(false); - //kaha.setIndexCacheSize(10); answer.setDataDirectoryFile(dataFileDir); answer.setUseJmx(false); - answer.addConnector(bindAddress); + answer.addConnector("tcp://localhost:0").setName("openwire"); + return answer; } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java index e180746939..d5ab676e1c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.bugs; +import static org.junit.Assert.assertEquals; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -24,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.TimeUnit; + import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -32,6 +34,7 @@ import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.Session; import javax.jms.TextMessage; + import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; @@ -49,19 +52,16 @@ import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; - /** - * Stuck messages test client. - *

- * Will kick of publisher and consumer simultaneously, and will usually result in stuck messages on the queue. + * Will kick of publisher and consumer simultaneously, and will usually result in + * stuck messages on the queue. */ @RunWith(Parameterized.class) public class AMQ5266Test { static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class); - String activemqURL = "tcp://localhost:61617"; - BrokerService brokerService; + + private String activemqURL; + private BrokerService brokerService; public int messageSize = 1000; @@ -167,7 +167,6 @@ public class AMQ5266Test { consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount); - LOG.info("Starting Publisher..."); publisher.start(); @@ -178,17 +177,15 @@ public class AMQ5266Test { int distinctPublishedCount = 0; - LOG.info("Waiting For Publisher Completion..."); publisher.waitForCompletion(); - List publishedIds = publisher.getIDs(); - distinctPublishedCount = new TreeSet(publishedIds).size(); + List publishedIds = publisher.getIDs(); + distinctPublishedCount = new TreeSet(publishedIds).size(); LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount); - long endWait = System.currentTimeMillis() + consumerWaitForConsumption; while (!consumer.completed() && System.currentTimeMillis() < endWait) { try { @@ -221,7 +218,6 @@ public class AMQ5266Test { LOG.info(sb.toString()); assertEquals("expect to get all messages!", 0, diff); - } } @@ -315,6 +311,7 @@ public class AMQ5266Test { mp = session.createProducer(q); } + @Override public void run() { try { @@ -354,7 +351,6 @@ public class AMQ5266Test { } } } - } String messageText; @@ -378,7 +374,6 @@ public class AMQ5266Test { return messageText; } - public class ExportQueueConsumer { private final String amqUser = ActiveMQConnection.DEFAULT_USER; @@ -428,11 +423,8 @@ public class AMQ5266Test { // Start the threads public void start() throws Exception { - for (List list : threads.values()) { - for (ConsumerThread ct : list) { - ct.start(); } } @@ -441,19 +433,14 @@ public class AMQ5266Test { // Tell the threads to stop // Then wait for them to stop public void shutdown() throws Exception { - for (List list : threads.values()) { - for (ConsumerThread ct : list) { - ct.shutdown(); } } for (List list : threads.values()) { - for (ConsumerThread ct : list) { - ct.join(); } } @@ -517,6 +504,7 @@ public class AMQ5266Test { idList = idsByQueue.get(queueName); } + @Override public void run() { try { @@ -554,13 +542,10 @@ public class AMQ5266Test { session.commit(); count = 0; - // Sleep a little before trying to read after not getting a message - try { if (idList.size() < totalToExpect) { LOG.info("did not receive on {}, current count: {}", qName, idList.size()); } - //sleep(3000); } catch (Exception e) { } } @@ -568,7 +553,6 @@ public class AMQ5266Test { } catch (Exception e) { e.printStackTrace(); } finally { - // Once we exit, close everything close(); } @@ -593,7 +577,6 @@ public class AMQ5266Test { try { qc.close(); } catch (Exception e) { - } } }