diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java index e69e2930e4..469f05f51c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java @@ -17,6 +17,8 @@ package org.apache.activemq.bugs; +import java.util.Timer; +import java.util.TimerTask; import java.util.Vector; import junit.framework.TestCase; @@ -33,45 +35,54 @@ import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +interface Configurer { + public void configure(BrokerService broker) throws Exception; +} + public class AMQ2149Test extends TestCase { - private static final Log log = LogFactory.getLog(AMQ2149Test.class); + private static final Log LOG = LogFactory.getLog(AMQ2149Test.class); - private String BROKER_URL; + private static final long BROKER_STOP_PERIOD = 15 * 1000; + + private static final String BROKER_CONNECTOR = "tcp://localhost:61617"; + private static final String BROKER_URL = "failover:("+ BROKER_CONNECTOR + +")?maxReconnectDelay=1000&useExponentialBackOff=false"; + private final String SEQ_NUM_PROPERTY = "seqNum"; final int MESSAGE_LENGTH_BYTES = 75000; final int MAX_TO_SEND = 2000; final long SLEEP_BETWEEN_SEND_MS = 5; final int NUM_SENDERS_AND_RECEIVERS = 10; + final Object brokerLock = new Object(); BrokerService broker; Vector exceptions = new Vector(); - public void setUp() throws Exception { + public void createBroker(Configurer configurer) throws Exception { broker = new BrokerService(); - broker.addConnector("tcp://localhost:0"); - broker.deleteAllMessages(); - - SystemUsage usage = new SystemUsage(); - MemoryUsage memoryUsage = new MemoryUsage(); - memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS); - usage.setMemoryUsage(memoryUsage); - broker.setSystemUsage(usage); + broker.setDataDirectory("target/amq-data/" + getName()); + broker.addConnector(BROKER_CONNECTOR); + if (configurer != null) { + configurer.configure(broker); + } + broker.setBrokerName(getName()); broker.start(); - - BROKER_URL = "failover:(" - + broker.getTransportConnectors().get(0).getUri() - +")?maxReconnectDelay=1000&useExponentialBackOff=false"; } public void tearDown() throws Exception { - broker.stop(); + synchronized(brokerLock) { + broker.stop(); + broker.waitUntilStopped(); + } + exceptions.clear(); } private String buildLongString() { @@ -94,6 +105,8 @@ public class AMQ2149Test extends TestCase { private final MessageConsumer messageConsumer; private volatile long nextExpectedSeqNum = 0; + + private String lastId = null; public Receiver(String queueName) throws JMSException { this.queueName = queueName; @@ -106,21 +119,33 @@ public class AMQ2149Test extends TestCase { connection.start(); } + public void close() throws JMSException { + connection.close(); + } + + public long getNextExpectedSeqNo() { + return nextExpectedSeqNum; + } + public void onMessage(Message message) { try { final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); if ((seqNum % 100) == 0) { - log.info(queueName + " received " + seqNum); + LOG.info(queueName + " received " + seqNum); } if (seqNum != nextExpectedSeqNum) { - log.warn(queueName + " received " + seqNum + " expected " - + nextExpectedSeqNum); + LOG.warn(queueName + " received " + seqNum + + " in msg: " + message.getJMSMessageID() + + " expected " + + nextExpectedSeqNum + + ", lastId: " + lastId); fail(queueName + " received " + seqNum + " expected " + nextExpectedSeqNum); } ++nextExpectedSeqNum; + lastId = message.getJMSMessageID(); } catch (Throwable e) { - log.error(queueName + " onMessage error", e); + LOG.error(queueName + " onMessage error", e); exceptions.add(e); } } @@ -161,38 +186,150 @@ public class AMQ2149Test extends TestCase { ++nextSequenceNumber; messageProducer.send(message); } catch (Exception e) { - log.error(queueName + " send error", e); + LOG.error(queueName + " send error", e); exceptions.add(e); } try { Thread.sleep(SLEEP_BETWEEN_SEND_MS); } catch (InterruptedException e) { - log.warn(queueName + " sleep interrupted", e); + LOG.warn(queueName + " sleep interrupted", e); } } + try { + connection.close(); + } catch (JMSException ignored) { + } } } - public void testOutOfOrderWithMemeUsageLimit() throws Exception { + public void testOrderWithMemeUsageLimit() throws Exception { + + createBroker(new Configurer() { + public void configure(BrokerService broker) throws Exception { + SystemUsage usage = new SystemUsage(); + MemoryUsage memoryUsage = new MemoryUsage(); + memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS); + usage.setMemoryUsage(memoryUsage); + broker.setSystemUsage(usage); + + broker.deleteAllMessages(); + } + }); + + verifyOrderedMessageReceipt(); + } + + public void testOrderWithRestartVMIndex() throws Exception { + createBroker(new Configurer() { + public void configure(BrokerService broker) throws Exception { + AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory(); + persistenceFactory.setPersistentIndex(false); + broker.setPersistenceFactory(persistenceFactory); + broker.deleteAllMessages(); + } + }); + + final Timer timer = new Timer(); + schedualRestartTask(timer, new Configurer() { + public void configure(BrokerService broker) throws Exception { + AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory(); + persistenceFactory.setPersistentIndex(false); + broker.setPersistenceFactory(persistenceFactory); + } + }); + + try { + verifyOrderedMessageReceipt(); + } finally { + timer.cancel(); + } + } + + + public void x_testOrderWithRestartWithForceRecover() throws Exception { + createBroker(new Configurer() { + public void configure(BrokerService broker) throws Exception { + AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory(); + persistenceFactory.setForceRecoverReferenceStore(true); + broker.setPersistenceFactory(persistenceFactory); + broker.deleteAllMessages(); + } + }); + + final Timer timer = new Timer(); + schedualRestartTask(timer, new Configurer() { + public void configure(BrokerService broker) throws Exception { + AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory(); + persistenceFactory.setForceRecoverReferenceStore(true); + broker.setPersistenceFactory(persistenceFactory); + } + }); + + try { + verifyOrderedMessageReceipt(); + } finally { + timer.cancel(); + } + } + + private void schedualRestartTask(Timer timer) { + schedualRestartTask(timer, null); + } + + private void schedualRestartTask(final Timer timer, final Configurer configurer) { + timer.schedule(new TimerTask() { + public void run() { + synchronized (brokerLock) { + LOG.info("stopping broker.."); + try { + broker.stop(); + } catch (Exception e) { + LOG.error("ex on broker stop", e); + exceptions.add(e); + } + LOG.info("restarting broker"); + try { + createBroker(configurer); + } catch (Exception e) { + LOG.error("ex on broker restart", e); + exceptions.add(e); + } + } + // do once + // timer.cancel(); + } + }, BROKER_STOP_PERIOD, BROKER_STOP_PERIOD); + } + + private void verifyOrderedMessageReceipt() throws Exception { + Vector threads = new Vector(); + Vector receivers = new Vector(); for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) { final String queueName = "test.queue." + i; - new Receiver(queueName); + receivers.add(new Receiver(queueName)); Thread thread = new Thread(new Sender(queueName)); thread.start(); threads.add(thread); } final long expiry = System.currentTimeMillis() + 1000 * 60 * 5; - while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) { + while(!threads.isEmpty() && !receivers.isEmpty() + && exceptions.isEmpty() && System.currentTimeMillis() < expiry) { Thread sendThread = threads.firstElement(); sendThread.join(1000*10); if (!sendThread.isAlive()) { threads.remove(sendThread); } + + Receiver receiver = receivers.firstElement(); + if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND) { + receiver.close(); + receivers.remove(receiver); + } } - assertTrue("No timeout waiting for senders to complete", System.currentTimeMillis() < expiry); + assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < expiry); assertTrue("No exceptions", exceptions.isEmpty()); }