Clean up some warning, remove System.out calls, remove references to
static ports.
This commit is contained in:
Timothy Bish 2015-07-09 13:52:30 -04:00
parent f10aab6428
commit 21c3ba3582
2 changed files with 66 additions and 59 deletions

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@ -33,31 +34,41 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ2512Test extends EmbeddedBrokerTestSupport {
private static Connection connection;
private final static String QUEUE_NAME = "dee.q";
private final static int INITIAL_MESSAGES_CNT = 1000;
private final static int WORKER_INTERNAL_ITERATIONS = 100;
private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS
+ INITIAL_MESSAGES_CNT;
private final static byte[] payload = new byte[5 * 1024];
private final static String TEXT = new String(payload);
public class AMQ2512Test {
private final static String PRP_INITIAL_ID = "initial-id";
private final static String PRP_WORKER_ID = "worker-id";
private static final Logger LOG = LoggerFactory.getLogger(AMQ2512Test.class);
private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
private final String QUEUE_NAME = "dee.q";
private final int INITIAL_MESSAGES_CNT = 1000;
private final int WORKER_INTERNAL_ITERATIONS = 100;
private final int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS + INITIAL_MESSAGES_CNT;
private final byte[] payload = new byte[5 * 1024];
private final String TEXT = new String(payload);
private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger();
private final String PRP_INITIAL_ID = "initial-id";
private final String PRP_WORKER_ID = "worker-id";
private final CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
private final AtomicInteger ON_MSG_COUNTER = new AtomicInteger();
private BrokerService brokerService;
private Connection connection;
private String connectionURI;
@Test(timeout = 60000)
public void testKahaDBFailure() throws Exception {
final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress);
final ConnectionFactory fac = new ActiveMQConnectionFactory(connectionURI);
connection = fac.createConnection();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue(QUEUE_NAME);
@ -80,18 +91,20 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport {
LATCH.await();
final long endTime = System.nanoTime();
System.out.println("Total execution time = "
LOG.info("Total execution time = "
+ TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms].");
System.out.println("Rate = " + TOTAL_MESSAGES_CNT
LOG.info("Rate = " + TOTAL_MESSAGES_CNT
/ TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s].");
for (Consumer c : consumers) {
c.close();
}
connection.close();
}
private final static class Consumer implements MessageListener {
private final class Consumer implements MessageListener {
private final String name;
private final Session session;
private final MessageProducer producer;
@ -111,6 +124,7 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport {
}
}
@Override
public void onMessage(Message message) {
final TextMessage msg = (TextMessage) message;
try {
@ -130,7 +144,7 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport {
} finally {
final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement();
if (onMsgCounter % 1000 == 0) {
System.out.println("message received: " + onMsgCounter);
LOG.info("message received: " + onMsgCounter);
}
LATCH.countDown();
}
@ -148,27 +162,37 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport {
}
}
@Override
protected void setUp() throws Exception {
bindAddress = "tcp://0.0.0.0:61617";
super.setUp();
@Before
public void setUp() throws Exception {
brokerService = createBroker();
brokerService.start();
connectionURI = brokerService.getTransportConnectorByName("openwire").getPublishableConnectString();
}
@After
public void tearDown() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
}
}
@Override
protected BrokerService createBroker() throws Exception {
File dataFileDir = new File("target/test-amq-2512/datadb");
IOHelper.mkdirs(dataFileDir);
IOHelper.deleteChildren(dataFileDir);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFileDir);
kaha.setDirectory(dataFileDir);
kaha.setEnableJournalDiskSyncs(false);
BrokerService answer = new BrokerService();
answer.setPersistenceAdapter(kaha);
kaha.setEnableJournalDiskSyncs(false);
//kaha.setIndexCacheSize(10);
answer.setDataDirectoryFile(dataFileDir);
answer.setUseJmx(false);
answer.addConnector(bindAddress);
answer.addConnector("tcp://localhost:0").setName("openwire");
return answer;
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -24,7 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -32,6 +34,7 @@ import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
@ -49,19 +52,16 @@ import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
/**
* Stuck messages test client.
* <p/>
* Will kick of publisher and consumer simultaneously, and will usually result in stuck messages on the queue.
* Will kick of publisher and consumer simultaneously, and will usually result in
* stuck messages on the queue.
*/
@RunWith(Parameterized.class)
public class AMQ5266Test {
static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class);
String activemqURL = "tcp://localhost:61617";
BrokerService brokerService;
private String activemqURL;
private BrokerService brokerService;
public int messageSize = 1000;
@ -167,7 +167,6 @@ public class AMQ5266Test {
consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
LOG.info("Starting Publisher...");
publisher.start();
@ -178,17 +177,15 @@ public class AMQ5266Test {
int distinctPublishedCount = 0;
LOG.info("Waiting For Publisher Completion...");
publisher.waitForCompletion();
List publishedIds = publisher.getIDs();
distinctPublishedCount = new TreeSet(publishedIds).size();
List<String> publishedIds = publisher.getIDs();
distinctPublishedCount = new TreeSet<String>(publishedIds).size();
LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
while (!consumer.completed() && System.currentTimeMillis() < endWait) {
try {
@ -221,7 +218,6 @@ public class AMQ5266Test {
LOG.info(sb.toString());
assertEquals("expect to get all messages!", 0, diff);
}
}
@ -315,6 +311,7 @@ public class AMQ5266Test {
mp = session.createProducer(q);
}
@Override
public void run() {
try {
@ -354,7 +351,6 @@ public class AMQ5266Test {
}
}
}
}
String messageText;
@ -378,7 +374,6 @@ public class AMQ5266Test {
return messageText;
}
public class ExportQueueConsumer {
private final String amqUser = ActiveMQConnection.DEFAULT_USER;
@ -428,11 +423,8 @@ public class AMQ5266Test {
// Start the threads
public void start() throws Exception {
for (List<ConsumerThread> list : threads.values()) {
for (ConsumerThread ct : list) {
ct.start();
}
}
@ -441,19 +433,14 @@ public class AMQ5266Test {
// Tell the threads to stop
// Then wait for them to stop
public void shutdown() throws Exception {
for (List<ConsumerThread> list : threads.values()) {
for (ConsumerThread ct : list) {
ct.shutdown();
}
}
for (List<ConsumerThread> list : threads.values()) {
for (ConsumerThread ct : list) {
ct.join();
}
}
@ -517,6 +504,7 @@ public class AMQ5266Test {
idList = idsByQueue.get(queueName);
}
@Override
public void run() {
try {
@ -554,13 +542,10 @@ public class AMQ5266Test {
session.commit();
count = 0;
// Sleep a little before trying to read after not getting a message
try {
if (idList.size() < totalToExpect) {
LOG.info("did not receive on {}, current count: {}", qName, idList.size());
}
//sleep(3000);
} catch (Exception e) {
}
}
@ -568,7 +553,6 @@ public class AMQ5266Test {
} catch (Exception e) {
e.printStackTrace();
} finally {
// Once we exit, close everything
close();
}
@ -593,7 +577,6 @@ public class AMQ5266Test {
try {
qc.close();
} catch (Exception e) {
}
}
}