Fix warnings from use of deprecated asserts etc.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1442736 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-02-05 20:43:15 +00:00
parent 1c9626b5a3
commit 5219fa1a17
19 changed files with 357 additions and 383 deletions

View File

@ -21,17 +21,20 @@ import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* An AMQ-1282 Test
*
*/
public class AMQ1282 extends TestCase {
private ConnectionFactory factory;
private Connection connection;
private MapMessage message;
@Override
protected void setUp() throws Exception {
factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
connection = factory.createConnection();
@ -41,6 +44,7 @@ public class AMQ1282 extends TestCase {
super.setUp();
}
@Override
protected void tearDown() throws Exception {
connection.close();
super.tearDown();
@ -72,7 +76,7 @@ public class AMQ1282 extends TestCase {
Integer actual = message.getInt("foo");
assertEquals(expected, actual);
} catch (Exception ex) {
Class aClass = expected.getClass();
Class<?> aClass = expected.getClass();
assertTrue(aClass.isInstance(ex));
}
}
@ -88,7 +92,7 @@ public class AMQ1282 extends TestCase {
Short actual = message.getShort("foo");
assertEquals(expected, actual);
} catch (Exception ex) {
Class aClass = expected.getClass();
Class<?> aClass = expected.getClass();
assertTrue(aClass.isInstance(ex));
}
}
@ -104,7 +108,7 @@ public class AMQ1282 extends TestCase {
Long actual = message.getLong("foo");
assertEquals(expected, actual);
} catch (Exception ex) {
Class aClass = expected.getClass();
Class<?> aClass = expected.getClass();
assertTrue(aClass.isInstance(ex));
}
}
@ -120,7 +124,7 @@ public class AMQ1282 extends TestCase {
String actual = message.getString("foo");
assertEquals(expected, actual);
} catch (Exception ex) {
Class aClass = expected.getClass();
Class<?> aClass = expected.getClass();
assertTrue(aClass.isInstance(ex));
}
}
@ -145,7 +149,7 @@ public class AMQ1282 extends TestCase {
Byte actual = message.getByte("foo");
assertEquals(expected, actual);
} catch (Exception ex) {
Class aClass = expected.getClass();
Class<?> aClass = expected.getClass();
assertTrue(aClass.isInstance(ex));
}
}
@ -161,7 +165,7 @@ public class AMQ1282 extends TestCase {
Double actual = message.getDouble("foo");
assertEquals(expected, actual);
} catch (Exception ex) {
Class aClass = expected.getClass();
Class<?> aClass = expected.getClass();
assertTrue(aClass.isInstance(ex));
}
}
@ -177,7 +181,7 @@ public class AMQ1282 extends TestCase {
Float actual = message.getFloat("foo");
assertEquals(expected, actual);
} catch (Exception ex) {
Class aClass = expected.getClass();
Class<?> aClass = expected.getClass();
assertTrue(aClass.isInstance(ex));
}
}

View File

@ -16,12 +16,6 @@
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@ -29,6 +23,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
@ -40,53 +35,62 @@ import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger;
/**
* A AMQ1936Test
*
*/
public class AMQ1936Test extends TestCase{
private final static Logger logger = Logger.getLogger( AMQ1936Test.class );
private final static String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue";
////--
public class AMQ1936Test extends TestCase {
private final static Logger logger = Logger.getLogger(AMQ1936Test.class);
private final static String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue";
// //--
//
private final static long TEST_MESSAGE_COUNT = 6000; // The number of test messages to use
private final static long TEST_MESSAGE_COUNT = 6000; // The number of test messages to use
//
////--
private final static int CONSUMER_COUNT = 2; // The number of message receiver instances
private final static boolean TRANSACTED_RECEIVE = true; // Flag used by receiver which indicates messages should be processed within a JMS transaction
// //--
private final static int CONSUMER_COUNT = 2; // The number of message receiver instances
private final static boolean TRANSACTED_RECEIVE = true; // Flag used by receiver which indicates messages should be
// processed within a JMS transaction
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor( CONSUMER_COUNT,CONSUMER_COUNT, Long.MAX_VALUE,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
private ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[ CONSUMER_COUNT ];
private BrokerService broker = null;
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CONSUMER_COUNT, CONSUMER_COUNT, Long.MAX_VALUE, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
private final ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[CONSUMER_COUNT];
private BrokerService broker = null;
static QueueConnectionFactory connectionFactory = null;
@Override
protected void setUp() throws Exception {
super.setUp();
broker = new BrokerService();
broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
broker.getSystemUsage().getMemoryUsage().setLimit(5 * 1024 * 1024);
broker.setBrokerName("test");
broker.setDeleteAllMessagesOnStartup(true);
broker.start();
connectionFactory = new ActiveMQConnectionFactory("vm://test");;
connectionFactory = new ActiveMQConnectionFactory("vm://test");
;
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
if( threadPool!=null ) {
if (threadPool != null) {
// signal receivers to stop
for( ThreadedMessageReceiver receiver: receivers) {
receiver.setShouldStop( true );
for (ThreadedMessageReceiver receiver : receivers) {
receiver.setShouldStop(true);
}
logger.info("Waiting for receivers to shutdown..");
if( ! threadPool.awaitTermination( 10, TimeUnit.SECONDS ) ) {
if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
logger.warn("Not all receivers completed shutdown.");
} else {
logger.info("All receivers shutdown successfully..");
@ -95,105 +99,107 @@ public class AMQ1936Test extends TestCase{
logger.debug("Stoping the broker.");
if( broker!=null ) {
if (broker != null) {
broker.stop();
}
}
private void sendTextMessage( String queueName, int i ) throws JMSException, NamingException {
QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test");
QueueConnection queueConnection = null;
QueueSession session = null;
QueueSender sender = null;
Queue queue = null;
TextMessage message = null;
private void sendTextMessage(String queueName, int i) throws JMSException, NamingException {
QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test");
QueueConnection queueConnection = null;
QueueSession session = null;
QueueSender sender = null;
Queue queue = null;
TextMessage message = null;
try {
// Create the queue connection
queueConnection = connectionFactory.createQueueConnection();
session = queueConnection.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE );
session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
queue = session.createQueue(TEST_QUEUE_NAME);
sender = session.createSender( queue );
sender.setDeliveryMode( DeliveryMode.PERSISTENT );
sender = session.createSender(queue);
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
message = session.createTextMessage( String.valueOf(i) );
message = session.createTextMessage(String.valueOf(i));
// send the message
sender.send( message );
sender.send(message);
if( session.getTransacted()) {
if (session.getTransacted()) {
session.commit();
}
if (i%1000 == 0) {
logger.info( "Message successfully sent to : " + queue.getQueueName( ) + " messageid: " + message.getJMSMessageID( )
+ " content:" + message.getText());
if (i % 1000 == 0) {
logger.info("Message successfully sent to : " + queue.getQueueName() + " messageid: " + message.getJMSMessageID() + " content:"
+ message.getText());
}
} finally {
if( sender!=null ) {
if (sender != null) {
sender.close();
}
if( session!=null ) {
if (session != null) {
session.close();
}
if( queueConnection!=null ) {
if (queueConnection != null) {
queueConnection.close();
}
}
}
public void testForDuplicateMessages( ) throws Exception {
final ConcurrentHashMap<String,String> messages = new ConcurrentHashMap<String, String>( );
final Object lock = new Object( );
final CountDownLatch duplicateSignal = new CountDownLatch( 1 );
final AtomicInteger messageCount = new AtomicInteger( 0 );
public void testForDuplicateMessages() throws Exception {
final ConcurrentHashMap<String, String> messages = new ConcurrentHashMap<String, String>();
final Object lock = new Object();
final CountDownLatch duplicateSignal = new CountDownLatch(1);
final AtomicInteger messageCount = new AtomicInteger(0);
// add 1/2 the number of our total messages
for( int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) {
if( duplicateSignal.getCount()==0 ) {
fail( "Duplicate message id detected" );
for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) {
if (duplicateSignal.getCount() == 0) {
fail("Duplicate message id detected");
}
sendTextMessage( TEST_QUEUE_NAME, i );
sendTextMessage(TEST_QUEUE_NAME, i);
}
// create a number of consumers to read of the messages and start them with a handler which simply stores the message ids
// create a number of consumers to read of the messages and start them with a handler which simply stores the
// message ids
// in a Map and checks for a duplicate
for( int i = 0; i < CONSUMER_COUNT; i++ ) {
receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler( ) {
for (int i = 0; i < CONSUMER_COUNT; i++) {
receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler() {
public void onMessage( Message message ) throws Exception {
synchronized( lock ) {
@Override
public void onMessage(Message message) throws Exception {
synchronized (lock) {
int current = messageCount.incrementAndGet();
if (current % 1000 == 0) {
logger.info( "Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage)message).getText() );
logger.info("Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage) message).getText());
}
if( messages.containsKey( message.getJMSMessageID()) ) {
duplicateSignal.countDown( );
logger.fatal( "duplicate message id detected:" + message.getJMSMessageID() );
fail( "Duplicate message id detected:" + message.getJMSMessageID() );
if (messages.containsKey(message.getJMSMessageID())) {
duplicateSignal.countDown();
logger.fatal("duplicate message id detected:" + message.getJMSMessageID());
fail("Duplicate message id detected:" + message.getJMSMessageID());
} else {
messages.put( message.getJMSMessageID(), message.getJMSMessageID() );
messages.put(message.getJMSMessageID(), message.getJMSMessageID());
}
}
}
});
threadPool.submit( receivers[i]);
threadPool.submit(receivers[i]);
}
// starting adding the remaining messages
for(int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) {
if( duplicateSignal.getCount()==0) {
fail( "Duplicate message id detected" );
for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) {
if (duplicateSignal.getCount() == 0) {
fail("Duplicate message id detected");
}
sendTextMessage( TEST_QUEUE_NAME, i );
sendTextMessage(TEST_QUEUE_NAME, i);
}
logger.info("sent all " + TEST_MESSAGE_COUNT + " messages");
// allow some time for messages to be delivered to receivers.
boolean ok = Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return TEST_MESSAGE_COUNT == messages.size();
}
@ -201,92 +207,88 @@ public class AMQ1936Test extends TestCase{
if (!ok) {
AutoFailTestSupport.dumpAllThreads("--STUCK?--");
}
assertEquals( "Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size( ) );
assertEquals( TEST_MESSAGE_COUNT, messageCount.get() );
assertEquals("Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size());
assertEquals(TEST_MESSAGE_COUNT, messageCount.get());
}
private final static class ThreadedMessageReceiver implements Runnable {
private String queueName = null;
private IMessageHandler handler = null;
private AtomicBoolean shouldStop = new AtomicBoolean( false );
private IMessageHandler handler = null;
private final AtomicBoolean shouldStop = new AtomicBoolean(false);
public ThreadedMessageReceiver(String queueName, IMessageHandler handler ) {
this.queueName = queueName;
this.handler = handler;
public ThreadedMessageReceiver(String queueName, IMessageHandler handler) {
this.handler = handler;
}
public void run( ) {
@Override
public void run() {
QueueConnection queueConnection = null;
QueueSession session = null;
QueueReceiver receiver = null;
Queue queue = null;
Message message = null;
QueueConnection queueConnection = null;
QueueSession session = null;
QueueReceiver receiver = null;
Queue queue = null;
Message message = null;
try {
try {
queueConnection = connectionFactory.createQueueConnection( );
queueConnection = connectionFactory.createQueueConnection();
// create a transacted session
session = queueConnection.createQueueSession( TRANSACTED_RECEIVE, QueueSession.AUTO_ACKNOWLEDGE );
session = queueConnection.createQueueSession(TRANSACTED_RECEIVE, QueueSession.AUTO_ACKNOWLEDGE);
queue = session.createQueue(TEST_QUEUE_NAME);
receiver = session.createReceiver( queue );
receiver = session.createReceiver(queue);
// start the connection
queueConnection.start( );
queueConnection.start();
logger.info( "Receiver " + Thread.currentThread().getName() + " connected." );
logger.info("Receiver " + Thread.currentThread().getName() + " connected.");
// start receive loop
while( ! ( shouldStop.get() || Thread.currentThread().isInterrupted()) ) {
while (!(shouldStop.get() || Thread.currentThread().isInterrupted())) {
try {
message = receiver.receive( 200 );
} catch( Exception e) {
message = receiver.receive(200);
} catch (Exception e) {
//
// ignore interrupted exceptions
//
if( e instanceof InterruptedException || e.getCause() instanceof InterruptedException ) {
if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException) {
/* ignore */
} else {
throw e;
}
}
if( message!=null && this.handler!=null ) {
if (message != null && this.handler != null) {
this.handler.onMessage(message);
}
// commit session on successful handling of message
if( session.getTransacted()) {
if (session.getTransacted()) {
session.commit();
}
}
logger.info( "Receiver " + Thread.currentThread().getName() + " shutting down." );
logger.info("Receiver " + Thread.currentThread().getName() + " shutting down.");
} finally {
if( receiver!=null ) {
if (receiver != null) {
try {
receiver.close();
} catch (JMSException e) {
} catch (JMSException e) {
logger.warn(e);
}
}
if( session!=null ) {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
} catch (JMSException e) {
logger.warn(e);
}
}
if( queueConnection!=null ) {
if (queueConnection != null) {
queueConnection.close();
}
}
} catch ( JMSException e ) {
} catch (JMSException e) {
logger.error(e);
e.printStackTrace();
} catch (NamingException e) {
@ -297,18 +299,12 @@ public class AMQ1936Test extends TestCase{
}
}
public Boolean getShouldStop() {
return shouldStop.get();
}
public void setShouldStop(Boolean shouldStop) {
this.shouldStop.set(shouldStop);
}
}
public interface IMessageHandler {
void onMessage( Message message ) throws Exception;
void onMessage(Message message) throws Exception;
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@ -27,8 +30,6 @@ import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import junit.framework.Assert;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
@ -78,26 +79,26 @@ public class AMQ2213Test
@Test
public void testEqualsGenericSession() throws JMSException
{
Assert.assertNotNull(this.connection);
assertNotNull(this.connection);
Session sess = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Assert.assertTrue(sess.equals(sess));
assertTrue(sess.equals(sess));
}
@Test
public void testEqualsTopicSession() throws JMSException
{
Assert.assertNotNull(this.connection);
Assert.assertTrue(this.connection instanceof TopicConnection);
assertNotNull(this.connection);
assertTrue(this.connection instanceof TopicConnection);
TopicSession sess = ((TopicConnection)this.connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Assert.assertTrue(sess.equals(sess));
assertTrue(sess.equals(sess));
}
@Test
public void testEqualsQueueSession() throws JMSException
{
Assert.assertNotNull(this.connection);
Assert.assertTrue(this.connection instanceof QueueConnection);
assertNotNull(this.connection);
assertTrue(this.connection instanceof QueueConnection);
QueueSession sess = ((QueueConnection)this.connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Assert.assertTrue(sess.equals(sess));
assertTrue(sess.equals(sess));
}
}

View File

@ -16,18 +16,8 @@
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.perf.NumberOfDestinationsTest;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
@ -36,41 +26,49 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBStore;
/*
A AMQ2356Test
We have an environment where we have a very large number of destinations.
In an effort to reduce the number of threads I have set the options
-Dorg.apache.activemq.UseDedicatedTaskRunner=false
A AMQ2356Test
We have an environment where we have a very large number of destinations.
In an effort to reduce the number of threads I have set the options
-Dorg.apache.activemq.UseDedicatedTaskRunner=false
and
and
<policyEntry queue=">" optimizedDispatch="true"/>
<policyEntry queue=">" optimizedDispatch="true"/>
Unfortunately this very quickly leads to deadlocked queues.
Unfortunately this very quickly leads to deadlocked queues.
My environment is:
My environment is:
ActiveMQ 5.2 Ubunty Jaunty kernel 2.6.28-14-generic #47-Ubuntu SMP (although only a single core on my system)
TCP transportConnector
ActiveMQ 5.2 Ubunty Jaunty kernel 2.6.28-14-generic #47-Ubuntu SMP (although only a single core on my system)
TCP transportConnector
To reproduce the bug (which I can do 100% of the time) I connect 5 consumers (AUTO_ACK) to 5 different queues.
Then I start 5 producers and pair them up with a consumer on a queue, and they start sending PERSISTENT messages.
I've set the producer to send 100 messages and disconnect, and the consumer to receive 100 messages and disconnect.
The first pair usually gets through their 100 messages and disconnect, at which point all the other pairs have
deadlocked at less than 30 messages each.
To reproduce the bug (which I can do 100% of the time) I connect 5 consumers (AUTO_ACK) to 5 different queues.
Then I start 5 producers and pair them up with a consumer on a queue, and they start sending PERSISTENT messages.
I've set the producer to send 100 messages and disconnect, and the consumer to receive 100 messages and disconnect.
The first pair usually gets through their 100 messages and disconnect, at which point all the other pairs have
deadlocked at less than 30 messages each.
*/
public class AMQ2356Test extends TestCase {
protected static final int MESSAGE_COUNT = 1000;
protected static final int NUMBER_OF_PAIRS = 10;
private static final Logger LOG = LoggerFactory.getLogger(NumberOfDestinationsTest.class);
protected BrokerService broker;
protected String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected int destinationCount;
public void testScenario() throws Exception {
for (int i = 0; i < NUMBER_OF_PAIRS; i++) {
ActiveMQQueue queue = new ActiveMQQueue(getClass().getName()+":"+i);
ActiveMQQueue queue = new ActiveMQQueue(getClass().getName() + ":" + i);
ProducerConsumerPair cp = new ProducerConsumerPair();
cp.start(this.brokerURL, queue, MESSAGE_COUNT);
cp.testRun();
@ -83,6 +81,7 @@ public class AMQ2356Test extends TestCase {
return session.createQueue(destinationName);
}
@Override
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
@ -90,6 +89,7 @@ public class AMQ2356Test extends TestCase {
super.setUp();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
if (broker != null) {
@ -109,19 +109,20 @@ public class AMQ2356Test extends TestCase {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFileDir);
answer.setUseJmx(false);
// Setup a destination policy where it takes only 1 message at a time.
// Setup a destination policy where it takes only 1 message at a time.
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
policy.setOptimizedDispatch(true);
policyMap.setDefaultEntry(policy);
answer.setDestinationPolicy(policyMap);
answer.setAdvisorySupport(false);
answer.setEnableStatistics(false);
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector(brokerURL);
}
static class ProducerConsumerPair {
private Destination destination;
private MessageProducer producer;
@ -130,41 +131,40 @@ public class AMQ2356Test extends TestCase {
private Connection consumerConnection;
private int numberOfMessages;
ProducerConsumerPair(){
ProducerConsumerPair() {
}
void start(String brokerURL, final Destination dest, int msgNum) throws Exception {
this.destination=dest;
this.numberOfMessages=msgNum;
this.destination = dest;
this.numberOfMessages = msgNum;
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL);
this.producerConnection = cf.createConnection();
this.producerConnection.start();
this.consumerConnection = cf.createConnection();
this.consumerConnection.start();
this.producer=createProducer(this.producerConnection);
this.consumer=createConsumer(this.consumerConnection);
this.producer = createProducer(this.producerConnection);
this.consumer = createConsumer(this.consumerConnection);
}
void testRun() throws Exception {
Session s = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0 ; i < this.numberOfMessages; i++) {
BytesMessage msg = s.createBytesMessage();
msg.writeBytes(new byte[1024]);
this.producer.send(msg);
}
int received = 0;
for (int i = 0 ; i < this.numberOfMessages; i++) {
Message msg = this.consumer.receive();
assertNotNull(msg);
received++;
}
assertEquals("Messages received on " + this.destination,this.numberOfMessages,received);
Session s = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < this.numberOfMessages; i++) {
BytesMessage msg = s.createBytesMessage();
msg.writeBytes(new byte[1024]);
this.producer.send(msg);
}
int received = 0;
for (int i = 0; i < this.numberOfMessages; i++) {
Message msg = this.consumer.receive();
assertNotNull(msg);
received++;
}
assertEquals("Messages received on " + this.destination, this.numberOfMessages, received);
}
void stop() throws Exception {
if (this.producerConnection != null) {
this.producerConnection.close();
@ -179,9 +179,9 @@ public class AMQ2356Test extends TestCase {
MessageProducer result = session.createProducer(this.destination);
return result;
}
private MessageConsumer createConsumer(Connection connection) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer result = session.createConsumer(this.destination);
return result;

View File

@ -17,6 +17,8 @@
package org.apache.activemq.bugs;
//package org.apache.activemq.transport.failover;
import static org.junit.Assert.assertEquals;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Collection;
@ -31,8 +33,6 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Assert;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
@ -74,6 +74,7 @@ public class AMQ2364Test {
final CountDownLatch latch = new CountDownLatch(messageCount);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
try {
session.rollback();
@ -107,6 +108,6 @@ public class AMQ2364Test {
connection.stop();
connection.close();
Assert.assertEquals("Transaction states not cleaned up", 0,transactionStates.size());
assertEquals("Transaction states not cleaned up", 0,transactionStates.size());
}
}

View File

@ -68,7 +68,7 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
public boolean useVMCursor = false;
public boolean useOptimizeAcks = false;
private ArrayList<Service> services = new ArrayList<Service>(CONSUMER_COUNT + PRODUCER_COUNT);
private final ArrayList<Service> services = new ArrayList<Service>(CONSUMER_COUNT + PRODUCER_COUNT);
AtomicInteger count = new AtomicInteger(0);
Semaphore receivedMessages;
AtomicBoolean running = new AtomicBoolean(false);
@ -77,9 +77,10 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
addCombinationValues("deliveryMode", new Object[] { DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT });
addCombinationValues("ackMode", new Object[] { Session.DUPS_OK_ACKNOWLEDGE, Session.AUTO_ACKNOWLEDGE });
addCombinationValues("useVMCursor", new Object[] { true, false });
//addCombinationValues("useOptimizeAcks", new Object[] {true, false});
// addCombinationValues("useOptimizeAcks", new Object[] {true, false});
}
@Override
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test");
@ -105,21 +106,21 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
receivedMessages = new Semaphore(0);
factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2401");
//factory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false&broker.persistent=false");
// factory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false&broker.persistent=false");
setAutoFail(true);
super.setUp();
}
@Override
protected void tearDown() throws Exception {
running.set(false);
for(Service service : services)
{
for (Service service : services) {
service.close();
}
broker.stop();
broker.waitUntilStopped();
super.tearDown();
}
@ -156,9 +157,10 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
/*
* (non-Javadoc)
*
*
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message message) {
receivedMessages.release();
if (count.incrementAndGet() % 100 == 0) {
@ -167,7 +169,7 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
track(message);
if (RECEIVER_THINK_TIME > 0) {
try {
Thread.currentThread().sleep(RECEIVER_THINK_TIME);
Thread.sleep(RECEIVER_THINK_TIME);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@ -176,11 +178,12 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
}
HashMap<ProducerId, boolean[]> tracker = new HashMap<ProducerId, boolean[]>();
private synchronized void track(Message message) {
try {
MessageId id = new MessageId(message.getJMSMessageID());
ProducerId pid = id.getProducerId();
int seq = (int)id.getProducerSequenceId();
int seq = (int) id.getProducerSequenceId();
boolean[] ids = tracker.get(pid);
if (ids == null) {
ids = new boolean[TO_SEND + 1];
@ -198,13 +201,14 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
/**
* @throws InterruptedException
* @throws TimeoutException
*
*
*/
private void waitForMessageReceipt() throws InterruptedException, TimeoutException {
try {
while (count.get() < SEND_COUNT) {
if (!receivedMessages.tryAcquire(HANG_THRESHOLD, TimeUnit.SECONDS)) {
if (count.get() == SEND_COUNT) break;
if (count.get() == SEND_COUNT)
break;
verifyTracking();
throw new TimeoutException("@count=" + count.get() + " Message not received for more than " + HANG_THRESHOLD + " seconds");
}
@ -218,7 +222,7 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
Vector<MessageId> missing = new Vector<MessageId>();
for (ProducerId pid : tracker.keySet()) {
boolean[] ids = tracker.get(pid);
for (int i=1; i<TO_SEND + 1; i++) {
for (int i = 1; i < TO_SEND + 1; i++) {
if (!ids[i]) {
missing.add(new MessageId(pid, i));
}
@ -236,34 +240,32 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
private class TestProducer implements Runnable, Service {
Thread thread;
BytesMessage message;
int id;
Connection connection;
Session session;
MessageProducer producer;
TestProducer(int id) throws Exception {
this.id = id;
thread = new Thread(this, "TestProducer-" + id);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
producer = session.createProducer(session.createQueue("AMQ2401Test"));
}
@Override
public void start() {
thread.start();
}
@Override
public void run() {
int i = 1;
for (; i <= TO_SEND; i++) {
try {
if (+i % 100 == 0) {
LOG.info(thread.currentThread().getName() + " Sending message " + i);
LOG.info(Thread.currentThread().getName() + " Sending message " + i);
}
message = session.createBytesMessage();
message.writeBytes(new byte[1024]);
@ -274,9 +276,10 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
break;
}
}
LOG.info(thread.currentThread().getName() + " Sent: " + (i-1));
LOG.info(Thread.currentThread().getName() + " Sent: " + (i - 1));
}
@Override
public void close() {
try {
connection.close();
@ -291,7 +294,6 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
ActiveMQConnection connection;
Session session;
MessageConsumer consumer;
Thread thread;
TestConsumer() throws Exception {
factory.setOptimizeAcknowledge(false);
@ -306,10 +308,12 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
consumer.setMessageListener(AMQ2413Test.this);
}
@Override
public void start() throws Exception {
connection.start();
}
@Override
public void close() {
try {
connection.close();
@ -321,9 +325,10 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
/*
* (non-Javadoc)
*
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
while (running.get()) {
try {
@ -332,12 +337,10 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
e.printStackTrace();
}
}
}
}
public static Test suite() {
return suite(AMQ2413Test.class);
}
return suite(AMQ2413Test.class);
}
}

View File

@ -27,6 +27,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.EmbeddedBrokerAndConnectionTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.spring.ConsumerBean;
public class AMQ2585Test extends EmbeddedBrokerAndConnectionTestSupport {
@ -58,7 +59,7 @@ public class AMQ2585Test extends EmbeddedBrokerAndConnectionTestSupport {
* lengths for the key name and value.
*/
final int sizeShouldBeNoLessThan = LENGTH10STRING.length() * 4 + received.DEFAULT_MINIMUM_MESSAGE_SIZE;
final int sizeShouldBeNoLessThan = LENGTH10STRING.length() * 4 + Message.DEFAULT_MINIMUM_MESSAGE_SIZE;
assertTrue("Message size was smaller than expected: " + received.getSize(),
received.getSize() >= sizeShouldBeNoLessThan);
assertFalse(LENGTH10STRING.length() * 2 == received.getSize());

View File

@ -20,7 +20,7 @@ import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
@ -63,6 +63,7 @@ public class AMQ2910Test extends JmsMultipleClientsTestSupport {
for (int i=0; i<maxConcurrency; i++) {
final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
executor.execute(new Runnable() {
@Override
public void run() {
try {
sendMessages(factory.createConnection(), dest, msgCount);
@ -75,15 +76,14 @@ public class AMQ2910Test extends JmsMultipleClientsTestSupport {
executor.shutdown();
assertTrue("send completed", executor.awaitTermination(60, TimeUnit.SECONDS));
assertNoExceptions();
executor = Executors.newCachedThreadPool();
for (int i=0; i<maxConcurrency; i++) {
final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
executor.execute(new Runnable() {
@Override
public void run() {
try {
startConsumers(factory, dest);

View File

@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage;
@ -96,6 +95,7 @@ public class AMQ2982Test {
.createQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
messageCountDown.countDown();
}

View File

@ -17,16 +17,20 @@
package org.apache.activemq.bugs;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
@ -35,33 +39,18 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test the loss of messages detected during testing with ActiveMQ 5.4.1 and 5.4.2.
* <p/>
* Symptoms:
* - 1 record is lost "early" in the stream.
* - no more records lost.
* Symptoms: - 1 record is lost "early" in the stream. - no more records lost.
* <p/>
* Test Configuration:
* - Broker Settings:
* - Destination Policy
* - Occurs with "Destination Policy" using Store Cursor and a memory limit
* - Not reproduced without "Destination Policy" defined
* - Persistence Adapter
* - Memory: Does not occur.
* - KahaDB: Occurs.
* - Messages
* - Occurs with TextMessage and BinaryMessage
* - Persistent messages.
* Test Configuration: - Broker Settings: - Destination Policy - Occurs with "Destination Policy" using Store Cursor and
* a memory limit - Not reproduced without "Destination Policy" defined - Persistence Adapter - Memory: Does not occur.
* - KahaDB: Occurs. - Messages - Occurs with TextMessage and BinaryMessage - Persistent messages.
* <p/>
* Notes:
* - Lower memory limits increase the rate of occurrence.
* - Higher memory limits may prevent the problem (probably because memory limits not reached).
* - Producers sending a number of messages before consumers come online increases rate of occurrence.
* Notes: - Lower memory limits increase the rate of occurrence. - Higher memory limits may prevent the problem
* (probably because memory limits not reached). - Producers sending a number of messages before consumers come online
* increases rate of occurrence.
*/
public class AMQ3167Test {
@ -79,25 +68,22 @@ public class AMQ3167Test {
protected Connection JMS_conn;
protected long Num_error = 0;
//// ////
//// UTILITIES ////
//// ////
// // ////
// // UTILITIES ////
// // ////
/**
* Create a new, unsecured, client connection to the test broker using the given username and password. This
* Create a new, unsecured, client connection to the test broker using the given username and password. This
* connection bypasses all security.
* <p/>
* Don't forget to start the connection or no messages will be received by consumers even though producers
* will work fine.
* Don't forget to start the connection or no messages will be received by consumers even though producers will work
* fine.
*
* @username name of the JMS user for the connection; may be null.
* @password Password for the JMS user; may be null.
*/
protected Connection createUnsecuredConnection(String username, String password)
throws javax.jms.JMSException {
protected Connection createUnsecuredConnection(String username, String password) throws javax.jms.JMSException {
ActiveMQConnectionFactory conn_fact;
conn_fact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI());
@ -105,15 +91,12 @@ public class AMQ3167Test {
return conn_fact.createConnection(username, password);
}
//// ////
//// TEST FUNCTIONALITY ////
//// ////
// // ////
// // TEST FUNCTIONALITY ////
// // ////
@Before
public void testPrep()
throws Exception {
public void testPrep() throws Exception {
embeddedBroker = new BrokerService();
configureBroker(embeddedBroker);
embeddedBroker.start();
@ -125,16 +108,12 @@ public class AMQ3167Test {
}
@After
public void testCleanup()
throws java.lang.Exception {
public void testCleanup() throws java.lang.Exception {
JMS_conn.stop();
embeddedBroker.stop();
}
protected void configureBroker(BrokerService broker_svc)
throws Exception {
TransportConnector conn;
protected void configureBroker(BrokerService broker_svc) throws Exception {
broker_svc.setBrokerName("testbroker1");
@ -144,7 +123,6 @@ public class AMQ3167Test {
configureDestinationPolicy(broker_svc);
}
/**
* NOTE: overrides any prior policy map defined for the broker service.
*/
@ -166,7 +144,6 @@ public class AMQ3167Test {
pol_ent.setProducerFlowControl(false);
ent_list.add(pol_ent);
//
// COMPLETE POLICY MAP
//
@ -177,14 +154,12 @@ public class AMQ3167Test {
broker_svc.setDestinationPolicy(pol_map);
}
//// ////
//// TEST ////
//// ////
// // ////
// // TEST ////
// // ////
@Test
public void testQueueLostMessage()
throws Exception {
public void testQueueLostMessage() throws Exception {
Destination dest;
dest = ActiveMQDestination.createDestination("lostmsgtest.queue", ActiveMQDestination.QUEUE_TYPE);
@ -201,7 +176,6 @@ public class AMQ3167Test {
assertTrue(Num_error == 0);
}
/**
*
*/
@ -211,14 +185,11 @@ public class AMQ3167Test {
java.lang.System.err.println(msg);
}
/**
* Main body of the lost-message test.
*/
protected void runLostMsgTest(Destination dest, int num_msg, int num_send_per_sess, int num_recv_per_sess,
boolean topic_f)
throws Exception {
protected void runLostMsgTest(Destination dest, int num_msg, int num_send_per_sess, int num_recv_per_sess, boolean topic_f) throws Exception {
Thread prod_thread;
Thread cons_thread;
String tag;
@ -227,7 +198,6 @@ public class AMQ3167Test {
MessageConsumer cons;
int ack_mode;
//
// Start the producer
//
@ -242,7 +212,6 @@ public class AMQ3167Test {
prod_thread.start();
log("Started producer " + tag);
//
// Delay before starting consumers
//
@ -250,7 +219,6 @@ public class AMQ3167Test {
log("Waiting before starting consumers");
java.lang.Thread.sleep(Consumer_startup_delay_ms);
//
// Now create and start the consumer
//
@ -270,7 +238,6 @@ public class AMQ3167Test {
cons_thread.start();
log("Started consumer " + tag);
//
// Wait for the producer and consumer to finish.
//
@ -284,14 +251,13 @@ public class AMQ3167Test {
log("Shutting down");
}
//// ////
//// INTERNAL CLASSES ////
//// ////
// // ////
// // INTERNAL CLASSES ////
// // ////
/**
* Producer thread - runs a single producer until the maximum number of messages is sent, the producer stop
* time is reached, or a test error is detected.
* Producer thread - runs a single producer until the maximum number of messages is sent, the producer stop time is
* reached, or a test error is detected.
*/
protected class producerThread extends Thread {
@ -313,16 +279,14 @@ public class AMQ3167Test {
numPerSess = sess_size;
}
public void execTest()
throws Exception {
public void execTest() throws Exception {
Message msg;
int sess_start;
int cur;
sess_start = 0;
cur = 0;
while ((cur < numMsg) && (!didTimeOut()) &&
((!Stop_after_error) || (Num_error == 0))) {
while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) {
msg = msgSess.createTextMessage("test message from " + producerTag);
msg.setStringProperty("testprodtag", producerTag);
msg.setIntProperty("seq", cur);
@ -331,7 +295,6 @@ public class AMQ3167Test {
((ActiveMQMessage) msg).setResponseRequired(true);
}
//
// Send the message.
//
@ -339,10 +302,9 @@ public class AMQ3167Test {
msgProd.send(msg);
cur++;
//
// Commit if the number of messages per session has been reached, and
// transactions are being used (only when > 1 msg per sess).
// transactions are being used (only when > 1 msg per sess).
//
if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
@ -356,11 +318,9 @@ public class AMQ3167Test {
msgSess.commit();
if (cur < numMsg)
log("* Producer " + producerTag + " timed out at " + java.lang.System.nanoTime() +
" (stop time " + producer_stop_time + ")");
log("* Producer " + producerTag + " timed out at " + java.lang.System.nanoTime() + " (stop time " + producer_stop_time + ")");
}
/**
* Check whether it is time for the producer to terminate.
*/
@ -395,10 +355,9 @@ public class AMQ3167Test {
}
}
/**
* Producer thread - runs a single consumer until the maximum number of messages is received, the consumer stop
* time is reached, or a test error is detected.
* Producer thread - runs a single consumer until the maximum number of messages is received, the consumer stop time
* is reached, or a test error is detected.
*/
protected class consumerThread extends Thread {
@ -418,8 +377,7 @@ public class AMQ3167Test {
numPerSess = sess_size;
}
public void execTest()
throws Exception {
public void execTest() throws Exception {
Message msg;
int sess_start;
int cur;
@ -428,8 +386,7 @@ public class AMQ3167Test {
sess_start = 0;
cur = 0;
while ((cur < numMsg) && (!didTimeOut()) &&
((!Stop_after_error) || (Num_error == 0))) {
while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) {
//
// Use a timeout of 1 second to periodically check the consumer timeout.
//
@ -453,7 +410,6 @@ public class AMQ3167Test {
log("* Consumer " + consumerTag + " timed out");
}
/**
* Check whether it is time for the consumer to terminate.
*/
@ -465,14 +421,12 @@ public class AMQ3167Test {
return false;
}
/**
* Verify the message received. Sequence numbers are checked and are expected to exactly match the
* message number (starting at 0).
* Verify the message received. Sequence numbers are checked and are expected to exactly match the message
* number (starting at 0).
*/
protected void checkMessage(Message msg, int exp_seq)
throws javax.jms.JMSException {
protected void checkMessage(Message msg, int exp_seq) throws javax.jms.JMSException {
int seq;
seq = msg.getIntProperty("seq");
@ -483,7 +437,6 @@ public class AMQ3167Test {
}
}
/**
* Run the consumer.
*/

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
@ -25,8 +27,6 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
import org.junit.Before;
import org.junit.Test;
@ -59,7 +59,7 @@ public class AMQ3537Test implements InvocationHandler, Serializable {
public static final Class[] TEST_CLASSES = new Class[] { List.class, NonJDKList.class, Serializable.class };
/** Underlying list */
private List l = new ArrayList<String>();
private final List l = new ArrayList<String>();
@Before
public void setUp() throws Exception {
@ -87,8 +87,10 @@ public class AMQ3537Test implements InvocationHandler, Serializable {
// in ClassLoadingAwareObjectInputStream
List deserializedProxy = (List) claois.readObject();
claois.close();
// assert the invocation worked
Assert.assertEquals("foo", deserializedProxy.get(0));
assertEquals("foo", deserializedProxy.get(0));
}
@Override

View File

@ -18,10 +18,6 @@ package org.apache.activemq.bugs;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
@ -37,11 +33,8 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.WriterAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Before;
import org.junit.Test;
@ -96,12 +89,10 @@ public class AMQ3567Test {
}
};
log4jLogger.addAppender(appender);
Level level = log4jLogger.getLevel();
log4jLogger.setLevel(Level.DEBUG);
BufferedReader read = null;
try {
stopConsumer();
stopBroker();

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertNotNull;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@ -26,6 +28,7 @@ import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
@ -38,10 +41,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class AMQ3903Test {
private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3903Test.class);

View File

@ -40,12 +40,9 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ3961Test {
private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3934Test.class);
private static BrokerService brokerService;
private static String BROKER_ADDRESS = "tcp://localhost:0";
@ -73,7 +70,7 @@ public class AMQ3961Test {
public class TestServerSessionPool implements ServerSessionPool {
private TopicConnection connection;
private final TopicConnection connection;
public TestServerSessionPool(final TopicConnection connection) {
this.connection = connection;
@ -88,7 +85,7 @@ public class AMQ3961Test {
public class TestServerSession implements ServerSession, MessageListener {
private TopicSession session;
private final TopicSession session;
public TestServerSession(final TopicSession session) throws JMSException {
this.session = session;
@ -114,8 +111,8 @@ public class AMQ3961Test {
}
public static final int MESSAGE_COUNT = 16;
private List<TestServerSession> processedSessions = new LinkedList<TestServerSession>();
private List<TestServerSession> committedSessions = new LinkedList<TestServerSession>();
private final List<TestServerSession> processedSessions = new LinkedList<TestServerSession>();
private final List<TestServerSession> committedSessions = new LinkedList<TestServerSession>();
@Test
public void testPrefetchInDurableSubscription() throws Exception {

View File

@ -173,6 +173,7 @@ public class AMQ4062Test {
assertEquals(5, info.getPrefetchSize());
}
@SuppressWarnings("unchecked")
private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException {
if(durableSubscriptions!=null) return durableSubscriptions;
RegionBroker regionBroker=(RegionBroker)service.getRegionBroker();
@ -197,8 +198,8 @@ public class AMQ4062Test {
public class PrefetchConsumer implements MessageListener{
public static final String SUBSCRIPTION_NAME = "A_NAME_ABC_DEF";
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private final String user = ActiveMQConnection.DEFAULT_USER;
private final String password = ActiveMQConnection.DEFAULT_PASSWORD;
private final String uri;
private boolean transacted;
ActiveMQConnection connection;
@ -224,6 +225,7 @@ public class AMQ4062Test {
consumer.setMessageListener(this);
}
@Override
public void onMessage(Message message) {
try {
a.await();
@ -245,7 +247,7 @@ public class AMQ4062Test {
protected final String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private final String password = ActiveMQConnection.DEFAULT_PASSWORD;
private final String uri;
private boolean transacted;

View File

@ -42,6 +42,7 @@ public class AMQ4213Test {
private String connectionUri;
@SuppressWarnings("unchecked")
@Before
public void setUp() throws Exception {
brokerService = new BrokerService();

View File

@ -16,9 +16,29 @@
*/
package org.apache.activemq.bugs;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.*;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.transport.vm.VMTransportFactory;
@ -26,14 +46,6 @@ import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.*;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
*/
@ -48,7 +60,6 @@ public class AMQ4222Test extends TestSupport {
super.setUp();
topic = false;
brokerService = createBroker();
}
@Override
@ -160,6 +171,7 @@ public class AMQ4222Test extends TestSupport {
}
@SuppressWarnings("unchecked")
private Map<ProducerId, ProducerBrokerExchange> getProducerExchangeFromConn(TransportConnection transportConnection) throws NoSuchFieldException, IllegalAccessException {
Field f = TransportConnection.class.getDeclaredField("producerExchanges");
f.setAccessible(true);
@ -168,11 +180,9 @@ public class AMQ4222Test extends TestSupport {
return producerExchanges;
}
private Message createRequest(Session session, Destination replyTo) throws JMSException {
Message message = session.createTextMessage("Payload");
message.setJMSReplyTo(replyTo);
return message;
}
}

View File

@ -25,9 +25,9 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
public class MissingDataFileTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(MissingDataFileTest.class);
private static int counter = 500;
private static int hectorToHaloCtr;
@ -55,13 +55,13 @@ public class MissingDataFileTest extends TestCase {
private static int haloToXenaCtr;
private static int haloToTroyCtr;
private String hectorToHalo = "hectorToHalo";
private String xenaToHalo = "xenaToHalo";
private String troyToHalo = "troyToHalo";
private final String hectorToHalo = "hectorToHalo";
private final String xenaToHalo = "xenaToHalo";
private final String troyToHalo = "troyToHalo";
private String haloToHector = "haloToHector";
private String haloToXena = "haloToXena";
private String haloToTroy = "haloToTroy";
private final String haloToHector = "haloToHector";
private final String haloToXena = "haloToXena";
private final String haloToTroy = "haloToTroy";
private BrokerService broker;
@ -74,7 +74,7 @@ public class MissingDataFileTest extends TestCase {
private final Object lock = new Object();
final boolean useTopic = false;
final boolean useSleep = true;
protected static final String payload = new String(new byte[500]);
public Connection createConnection() throws JMSException {
@ -92,21 +92,22 @@ public class MissingDataFileTest extends TestCase {
broker.setPersistent(true);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616").setName("Default");
SystemUsage systemUsage;
systemUsage = new SystemUsage();
systemUsage.getMemoryUsage().setLimit(10 * 1024 * 1024); // Just a few messags
broker.setSystemUsage(systemUsage);
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024);
kahaDBPersistenceAdapter.setCleanupInterval(500);
broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
broker.start();
LOG.info("Starting broker..");
}
@Override
public void tearDown() throws Exception {
hectorConnection.close();
xenaConnection.close();
@ -116,11 +117,12 @@ public class MissingDataFileTest extends TestCase {
}
public void testForNoDataFoundError() throws Exception {
startBroker();
hectorConnection = createConnection();
Thread hectorThread = buildProducer(hectorConnection, hectorToHalo, false, useTopic);
Receiver hHectorReceiver = new Receiver() {
@Override
public void receive(String s) throws Exception {
haloToHectorCtr++;
if (haloToHectorCtr >= counter) {
@ -136,6 +138,7 @@ public class MissingDataFileTest extends TestCase {
troyConnection = createConnection();
Thread troyThread = buildProducer(troyConnection, troyToHalo);
Receiver hTroyReceiver = new Receiver() {
@Override
public void receive(String s) throws Exception {
haloToTroyCtr++;
if (haloToTroyCtr >= counter) {
@ -151,6 +154,7 @@ public class MissingDataFileTest extends TestCase {
xenaConnection = createConnection();
Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
Receiver hXenaReceiver = new Receiver() {
@Override
public void receive(String s) throws Exception {
haloToXenaCtr++;
if (haloToXenaCtr >= counter) {
@ -168,6 +172,7 @@ public class MissingDataFileTest extends TestCase {
final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection, false);
final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection, false);
Receiver hectorReceiver = new Receiver() {
@Override
public void receive(String s) throws Exception {
hectorToHaloCtr++;
troySender.send(payload);
@ -180,6 +185,7 @@ public class MissingDataFileTest extends TestCase {
}
};
Receiver xenaReceiver = new Receiver() {
@Override
public void receive(String s) throws Exception {
xenaToHaloCtr++;
hectorSender.send(payload);
@ -192,6 +198,7 @@ public class MissingDataFileTest extends TestCase {
}
};
Receiver troyReceiver = new Receiver() {
@Override
public void receive(String s) throws Exception {
troyToHaloCtr++;
xenaSender.send(payload);
@ -239,7 +246,7 @@ public class MissingDataFileTest extends TestCase {
Thread.sleep(5000);
}
}
}
protected void waitForMessagesToBeDelivered() {
@ -272,10 +279,11 @@ public class MissingDataFileTest extends TestCase {
public Thread buildProducer(Connection connection, final String queueName) throws Exception {
return buildProducer(connection, queueName, false, false);
}
public Thread buildProducer(Connection connection, final String queueName, boolean transacted, boolean isTopic) throws Exception {
final MessageSender producer = new MessageSender(queueName, connection, transacted, isTopic);
Thread thread = new Thread() {
@Override
public synchronized void run() {
for (int i = 0; i < counter; i++) {
try {
@ -294,6 +302,7 @@ public class MissingDataFileTest extends TestCase {
MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName));
MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
ObjectMessage objectMessage = (ObjectMessage)message;

View File

@ -40,21 +40,24 @@ public class VerifySteadyEnqueueRate extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(VerifySteadyEnqueueRate.class);
private static int max_messages = 1000000;
private String destinationName = getName() + "_Queue";
private final String destinationName = getName() + "_Queue";
private BrokerService broker;
final boolean useTopic = false;
private boolean useAMQPStore = false;
private final boolean useAMQPStore = false;
protected static final String payload = new String(new byte[24]);
@Override
public void setUp() throws Exception {
startBroker();
}
@Override
public void tearDown() throws Exception {
broker.stop();
}
@SuppressWarnings("unused")
public void testEnqueueRateCanMeetSLA() throws Exception {
if (true) {
return;
@ -68,9 +71,10 @@ public class VerifySteadyEnqueueRate extends TestCase {
final AtomicLong slaViolations = new AtomicLong(0);
final AtomicLong max = new AtomicLong(0);
final int numThreads = 6;
Runnable runner = new Runnable() {
@Override
public void run() {
try {
MessageSender producer = new MessageSender(destinationName,
@ -83,13 +87,13 @@ public class VerifySteadyEnqueueRate extends TestCase {
long duration = endT - startT;
total.incrementAndGet();
if (duration > max.get()) {
max.set(duration);
}
if (duration > min) {
slaViolations.incrementAndGet();
slaViolations.incrementAndGet();
System.err.println("SLA violation @ "+Thread.currentThread().getName()
+ " "
+ DateFormat.getTimeInstance().format(
@ -107,11 +111,11 @@ public class VerifySteadyEnqueueRate extends TestCase {
}
};
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < numThreads; i++) {
executor.execute(runner);
}
executor.shutdown();
while(!executor.isTerminated()) {
executor.awaitTermination(10, TimeUnit.SECONDS);
@ -149,18 +153,18 @@ public class VerifySteadyEnqueueRate extends TestCase {
} else {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File("target/activemq-data/kahadb"));
// The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified
// The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified
// what happens if the index is updated but a journal update is lost.
// Index is going to be in consistent, but can it be repaired?
kaha.setEnableJournalDiskSyncs(false);
// Using a bigger journal file size makes he take fewer spikes as it is not switching files as often.
kaha.setJournalMaxFileLength(1024*1024*100);
// small batch means more frequent and smaller writes
kaha.setIndexWriteBatchSize(100);
// do the index write in a separate thread
kaha.setEnableIndexWriteAsync(true);
broker.setPersistenceAdapter(kaha);
}