diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 5ec51e2d1b..90b28cc544 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -1226,7 +1226,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon if (er.getException() instanceof JMSException) { throw (JMSException)er.getException(); } else { + try { throw JMSExceptionSupport.create(er.getException()); + }catch(Throwable e) { + LOG.error("Caught an exception trying to create a JMSException",e); + } } } return response; diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java index a6e096f2dd..024301a25f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java @@ -71,6 +71,7 @@ public class AsyncDataManager { public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; public static final String DEFAULT_FILE_PREFIX = "data-"; public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; + public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; private static final Log LOG = LogFactory.getLog(AsyncDataManager.class); @@ -188,7 +189,7 @@ public class AsyncDataManager { cleanup(); } }; - Scheduler.executePeriodically(cleanupTask, 1000 * 30); + Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); } public void lock() throws IOException { @@ -272,6 +273,7 @@ public class AsyncDataManager { if (currentWriteFile != null) { currentWriteFile.linkAfter(nextWriteFile); if (currentWriteFile.isUnused()) { + System.err.println("remove current file unused:" + currentWriteFile); removeDataFile(currentWriteFile); } } @@ -298,7 +300,7 @@ public class AsyncDataManager { DataFile dataFile = fileMap.get(key); if (dataFile == null) { LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); - throw new IOException("Could not locate data file " + filePrefix + "-" + item.getDataFileId()); + throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId()); } return dataFile; } @@ -308,7 +310,7 @@ public class AsyncDataManager { DataFile dataFile = fileMap.get(key); if (dataFile == null) { LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); - throw new IOException("Could not locate data file " + filePrefix + "-" + item.getDataFileId()); + throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId()); } return dataFile.getFile(); } @@ -411,7 +413,9 @@ public class AsyncDataManager { purgeList.add(dataFile); } for (DataFile dataFile : purgeList) { - forceRemoveDataFile(dataFile); + if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) { + forceRemoveDataFile(dataFile); + } } } @@ -463,11 +467,11 @@ public class AsyncDataManager { dataFile.unlink(); if (archiveDataLogs) { dataFile.move(getDirectoryArchive()); - LOG.debug("moced data file " + dataFile + " to " + LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive()); } else { boolean result = dataFile.delete(); - LOG.debug("discarding data file " + dataFile + LOG.info("discarding data file " + dataFile + (result ? "successful " : "failed")); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java index 50422c0aa2..716f8a00f3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java @@ -113,6 +113,7 @@ public class AMQMessageStore implements MessageStore { if (debug) { LOG.debug("Journalled message add for: " + id + ", at: " + location); } + this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId()); addMessage(message, location); } else { if (debug) { @@ -164,7 +165,6 @@ public class AMQMessageStore implements MessageStore { try { lastLocation = location; messages.put(message.getMessageId(), data); - this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId()); }finally { lock.unlock(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index 6bdd7dbeb6..33c5f3e1b6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -90,13 +90,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, private static final boolean BROKEN_FILE_LOCK; private static final boolean DISABLE_LOCKING; private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; - private AsyncDataManager asyncDataManager; private ReferenceStoreAdapter referenceStoreAdapter; private TaskRunnerFactory taskRunnerFactory; private WireFormat wireFormat = new OpenWireFormat(); private SystemUsage usageManager; - private long cleanupInterval = 1000 * 30; private long checkpointInterval = 1000 * 60; private int maxCheckpointMessageAddSize = 1024 * 4; private AMQTransactionStore transactionStore = new AMQTransactionStore(this); @@ -116,6 +114,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, private boolean persistentIndex=true; private boolean useNio = true; private boolean archiveDataLogs=false; + private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL; private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH; private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; @@ -425,8 +424,12 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, } Integer lastDataFile = asyncDataManager.getCurrentDataFileId(); inProgress.add(lastDataFile); - Set inUse = new HashSet(referenceStoreAdapter.getReferenceFileIdsInUse()); - asyncDataManager.consolidateDataFilesNotIn(inUse, inProgress); + inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse()); + Location lastActiveTx = transactionStore.checkpoint(); + if (lastActiveTx != null) { + lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId()); + } + asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1); } catch (IOException e) { LOG.error("Could not cleanup data files: " + e, e); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java index a30fa30d71..a06c8e917b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java @@ -44,6 +44,7 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { private boolean persistentIndex=true; private boolean useNio = true; private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH; + private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL; /** @@ -60,9 +61,18 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { result.setReferenceStoreAdapter(getReferenceStoreAdapter()); result.setUseNio(isUseNio()); result.setMaxFileLength(getMaxFileLength()); + result.setCleanupInterval(getCleanupInterval()); return result; } + public long getCleanupInterval() { + return cleanupInterval; + } + + public void setCleanupInterval(long val) { + cleanupInterval = val; + } + /** * @return the dataDirectory */ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java index 69e834d818..afa39ab98b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java @@ -18,9 +18,12 @@ package org.apache.activemq.store.amq; import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Set; import org.apache.activemq.command.JournalTopicAck; import org.apache.activemq.command.JournalTransaction; @@ -230,13 +233,13 @@ public class AMQTransactionStore implements TransactionStore { // But we keep track of the first location of an operation // that was associated with an active tx. The journal can not // roll over active tx records. - Location rc = null; + Location minimumLocationInUse = null; synchronized (inflightTransactions) { for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) { AMQTx tx = iter.next(); Location location = tx.getLocation(); - if (rc == null || rc.compareTo(location) < 0) { - rc = location; + if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) { + minimumLocationInUse = location; } } } @@ -244,11 +247,11 @@ public class AMQTransactionStore implements TransactionStore { for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) { AMQTx tx = iter.next(); Location location = tx.getLocation(); - if (rc == null || rc.compareTo(location) < 0) { - rc = location; + if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) { + minimumLocationInUse = location; } } - return rc; + return minimumLocationInUse; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java index 490acb6315..534f50cca5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java @@ -234,8 +234,8 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements * @throws IOException * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse() */ - public Set getReferenceFileIdsInUse() throws IOException { - return recordReferences.keySet(); + public synchronized Set getReferenceFileIdsInUse() throws IOException { + return new HashSet(recordReferences.keySet()); } /** diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java b/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java index 903a476ece..36c29282ad 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java @@ -1,41 +1,41 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -public class MessageSender { - private MessageProducer producer; - private Session session; - - public MessageSender(String queueName, Connection connection, boolean useTransactedSession) throws Exception { - session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(session.createQueue(queueName)); - } - - public void send(String payload) throws Exception { - ObjectMessage message = session.createObjectMessage(); - message.setObject(payload); - producer.send(message); - if (session.getTransacted()) { - session.commit(); - } - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +public class MessageSender { + private MessageProducer producer; + private Session session; + + public MessageSender(String queueName, Connection connection, boolean useTransactedSession, boolean topic) throws Exception { + session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(topic ? session.createTopic(queueName) : session.createQueue(queueName)); + } + + public void send(String payload) throws Exception { + ObjectMessage message = session.createObjectMessage(); + message.setObject(payload); + producer.send(message); + if (session.getTransacted()) { + session.commit(); + } + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java new file mode 100644 index 0000000000..67008a830f --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; +import org.apache.activemq.usage.SystemUsage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* + * Try and replicate: + * Caused by: java.io.IOException: Could not locate data file data--188 + * at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:302) + * at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:614) + * at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:523) + */ + +public class MissingDataFileTest extends TestCase { + + private static final Log LOG = LogFactory.getLog(MissingDataFileTest.class); + + private static int counter = 300; + + private static int hectorToHaloCtr; + private static int xenaToHaloCtr; + private static int troyToHaloCtr; + + private static int haloToHectorCtr; + private static int haloToXenaCtr; + private static int haloToTroyCtr; + + private String hectorToHalo = "hectorToHalo"; + private String xenaToHalo = "xenaToHalo"; + private String troyToHalo = "troyToHalo"; + + private String haloToHector = "haloToHector"; + private String haloToXena = "haloToXena"; + private String haloToTroy = "haloToTroy"; + + + private BrokerService broker; + + private Connection hectorConnection; + private Connection xenaConnection; + private Connection troyConnection; + private Connection haloConnection; + + private final Object lock = new Object(); + final boolean useTopic = false; + final boolean useSleep = true; + + protected static final String payload = new String(new byte[500]); + + public Connection createConnection() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + return factory.createConnection(); + } + + public Session createSession(Connection connection, boolean transacted) throws JMSException { + return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + } + + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.addConnector("tcp://localhost:61616").setName("Default"); + + SystemUsage systemUsage; + systemUsage = new SystemUsage(); + systemUsage.getMemoryUsage().setLimit(1024 * 10); // Just a few messags + broker.setSystemUsage(systemUsage); + + AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory(); + factory.setMaxFileLength(2*1024); // ~4 messages + factory.setCleanupInterval(5000); // every few second + + broker.start(); + LOG.info("Starting broker.."); + } + + public void tearDown() throws Exception { + hectorConnection.close(); + xenaConnection.close(); + troyConnection.close(); + haloConnection.close(); + broker.stop(); + } + + public void testForNoDataFoundError() throws Exception { + + startBroker(); + hectorConnection = createConnection(); + Thread hectorThread = buildProducer(hectorConnection, hectorToHalo, false, useTopic); + Receiver hHectorReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToHectorCtr++; + if (haloToHectorCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + possiblySleep(haloToHectorCtr); + } + }; + buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver, useTopic); + + troyConnection = createConnection(); + Thread troyThread = buildProducer(troyConnection, troyToHalo); + Receiver hTroyReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToTroyCtr++; + if (haloToTroyCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + possiblySleep(haloToTroyCtr); + } + }; + buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver, false); + + xenaConnection = createConnection(); + Thread xenaThread = buildProducer(xenaConnection, xenaToHalo); + Receiver hXenaReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToXenaCtr++; + if (haloToXenaCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + possiblySleep(haloToXenaCtr); + } + }; + buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver, false); + + haloConnection = createConnection(); + final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection, false); + final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection, false); + final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection, false); + Receiver hectorReceiver = new Receiver() { + public void receive(String s) throws Exception { + hectorToHaloCtr++; + troySender.send(payload); + if (hectorToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + possiblySleep(hectorToHaloCtr); + } + } + }; + Receiver xenaReceiver = new Receiver() { + public void receive(String s) throws Exception { + xenaToHaloCtr++; + hectorSender.send(payload); + if (xenaToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + possiblySleep(xenaToHaloCtr); + } + }; + Receiver troyReceiver = new Receiver() { + public void receive(String s) throws Exception { + troyToHaloCtr++; + xenaSender.send(payload); + if (troyToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver, false); + buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver, false); + buildReceiver(haloConnection, troyToHalo, true, troyReceiver, false); + + haloConnection.start(); + + troyConnection.start(); + troyThread.start(); + + xenaConnection.start(); + xenaThread.start(); + + hectorConnection.start(); + hectorThread.start(); + waitForMessagesToBeDelivered(); + // number of messages received should match messages sent + assertEquals(hectorToHaloCtr, counter); + LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages"); + assertEquals(xenaToHaloCtr, counter); + LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages"); + assertEquals(troyToHaloCtr, counter); + LOG.info("troyToHalo received " + troyToHaloCtr + " messages"); + assertEquals(haloToHectorCtr, counter); + LOG.info("haloToHector received " + haloToHectorCtr + " messages"); + assertEquals(haloToXenaCtr, counter); + LOG.info("haloToXena received " + haloToXenaCtr + " messages"); + assertEquals(haloToTroyCtr, counter); + LOG.info("haloToTroy received " + haloToTroyCtr + " messages"); + + } + + protected void possiblySleep(int count) throws InterruptedException { + if (useSleep) { + if (count % 100 == 0) { + Thread.sleep(5000); + } + } + + } + + protected void waitForMessagesToBeDelivered() { + // let's give the listeners enough time to read all messages + long maxWaitTime = counter * 1000; + long waitTime = maxWaitTime; + long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); + + synchronized (lock) { + boolean hasMessages = true; + while (hasMessages && waitTime >= 0) { + try { + lock.wait(200); + } catch (InterruptedException e) { + LOG.error(e); + } + // check if all messages have been received + hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter + || haloToTroyCtr < counter; + waitTime = maxWaitTime - (System.currentTimeMillis() - start); + } + } + } + + public MessageSender buildTransactionalProducer(String queueName, Connection connection, boolean isTopic) throws Exception { + + return new MessageSender(queueName, connection, true, isTopic); + } + + public Thread buildProducer(Connection connection, final String queueName) throws Exception { + return buildProducer(connection, queueName, false, false); + } + + public Thread buildProducer(Connection connection, final String queueName, boolean transacted, boolean isTopic) throws Exception { + final MessageSender producer = new MessageSender(queueName, connection, transacted, isTopic); + Thread thread = new Thread() { + public synchronized void run() { + for (int i = 0; i < counter; i++) { + try { + producer.send(payload ); + } catch (Exception e) { + throw new RuntimeException("on " + queueName + " send", e); + } + } + } + }; + return thread; + } + + public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception { + final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName)); + MessageListener messageListener = new MessageListener() { + + public void onMessage(Message message) { + try { + ObjectMessage objectMessage = (ObjectMessage)message; + String s = (String)objectMessage.getObject(); + receiver.receive(s); + if (session.getTransacted()) { + session.commit(); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + inputMessageConsumer.setMessageListener(messageListener); + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java index bc54382d02..12a1f1dd02 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java @@ -1,286 +1,286 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -import junit.framework.TestCase; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/* - * simulate message flow which cause the following exception in the broker - * (exception logged by client)

2007-07-24 13:51:23,624 - * com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure - * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344' - * has not been started. at - * org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230) - * This appears to be consistent in a MacBook. Haven't been able to replicate it - * on Windows though - */ -public class TransactionNotStartedErrorTest extends TestCase { - - private static final Log LOG = LogFactory.getLog(TransactionNotStartedErrorTest.class); - - private static int counter = 500; - - private static int hectorToHaloCtr; - private static int xenaToHaloCtr; - private static int troyToHaloCtr; - - private static int haloToHectorCtr; - private static int haloToXenaCtr; - private static int haloToTroyCtr; - - private String hectorToHalo = "hectorToHalo"; - private String xenaToHalo = "xenaToHalo"; - private String troyToHalo = "troyToHalo"; - - private String haloToHector = "haloToHector"; - private String haloToXena = "haloToXena"; - private String haloToTroy = "haloToTroy"; - - - private BrokerService broker; - - private Connection hectorConnection; - private Connection xenaConnection; - private Connection troyConnection; - private Connection haloConnection; - - private final Object lock = new Object(); - - public Connection createConnection() throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); - return factory.createConnection(); - } - - public Session createSession(Connection connection, boolean transacted) throws JMSException { - return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - } - - public void startBroker() throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.addConnector("tcp://localhost:61616").setName("Default"); - broker.start(); - LOG.info("Starting broker.."); - } - - public void tearDown() throws Exception { - hectorConnection.close(); - xenaConnection.close(); - troyConnection.close(); - haloConnection.close(); - broker.stop(); - } - - public void testTransactionNotStartedError() throws Exception { - startBroker(); - hectorConnection = createConnection(); - Thread hectorThread = buildProducer(hectorConnection, hectorToHalo); - Receiver hHectorReceiver = new Receiver() { - public void receive(String s) throws Exception { - haloToHectorCtr++; - if (haloToHectorCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver); - - troyConnection = createConnection(); - Thread troyThread = buildProducer(troyConnection, troyToHalo); - Receiver hTroyReceiver = new Receiver() { - public void receive(String s) throws Exception { - haloToTroyCtr++; - if (haloToTroyCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver); - - xenaConnection = createConnection(); - Thread xenaThread = buildProducer(xenaConnection, xenaToHalo); - Receiver hXenaReceiver = new Receiver() { - public void receive(String s) throws Exception { - haloToXenaCtr++; - if (haloToXenaCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver); - - haloConnection = createConnection(); - final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection); - final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection); - final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection); - Receiver hectorReceiver = new Receiver() { - public void receive(String s) throws Exception { - hectorToHaloCtr++; - troySender.send("halo to troy because of hector"); - if (hectorToHaloCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - Receiver xenaReceiver = new Receiver() { - public void receive(String s) throws Exception { - xenaToHaloCtr++; - hectorSender.send("halo to hector because of xena"); - if (xenaToHaloCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - Receiver troyReceiver = new Receiver() { - public void receive(String s) throws Exception { - troyToHaloCtr++; - xenaSender.send("halo to xena because of troy"); - if (troyToHaloCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver); - buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver); - buildReceiver(haloConnection, troyToHalo, true, troyReceiver); - - haloConnection.start(); - - troyConnection.start(); - troyThread.start(); - - xenaConnection.start(); - xenaThread.start(); - - hectorConnection.start(); - hectorThread.start(); - waitForMessagesToBeDelivered(); - // number of messages received should match messages sent - assertEquals(hectorToHaloCtr, counter); - LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages"); - assertEquals(xenaToHaloCtr, counter); - LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages"); - assertEquals(troyToHaloCtr, counter); - LOG.info("troyToHalo received " + troyToHaloCtr + " messages"); - assertEquals(haloToHectorCtr, counter); - LOG.info("haloToHector received " + haloToHectorCtr + " messages"); - assertEquals(haloToXenaCtr, counter); - LOG.info("haloToXena received " + haloToXenaCtr + " messages"); - assertEquals(haloToTroyCtr, counter); - LOG.info("haloToTroy received " + haloToTroyCtr + " messages"); - - } - - protected void waitForMessagesToBeDelivered() { - // let's give the listeners enough time to read all messages - long maxWaitTime = counter * 3000; - long waitTime = maxWaitTime; - long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); - - synchronized (lock) { - boolean hasMessages = true; - while (hasMessages && waitTime >= 0) { - try { - lock.wait(200); - } catch (InterruptedException e) { - LOG.error(e); - } - // check if all messages have been received - hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter - || haloToTroyCtr < counter; - waitTime = maxWaitTime - (System.currentTimeMillis() - start); - } - } - } - - public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception { - - return new MessageSender(queueName, connection, true); - } - - public Thread buildProducer(Connection connection, final String queueName) throws Exception { - - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageSender producer = new MessageSender(queueName, connection, false); - Thread thread = new Thread() { - - public synchronized void run() { - for (int i = 0; i < counter; i++) { - try { - producer.send(queueName); - if (session.getTransacted()) { - session.commit(); - } - - } catch (Exception e) { - throw new RuntimeException("on " + queueName + " send", e); - } - } - } - }; - return thread; - } - - public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver) throws Exception { - final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName)); - MessageListener messageListener = new MessageListener() { - - public void onMessage(Message message) { - try { - ObjectMessage objectMessage = (ObjectMessage)message; - String s = (String)objectMessage.getObject(); - receiver.receive(s); - if (session.getTransacted()) { - session.commit(); - } - - } catch (Exception e) { - e.printStackTrace(); - } - } - }; - inputMessageConsumer.setMessageListener(messageListener); - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* + * simulate message flow which cause the following exception in the broker + * (exception logged by client)

2007-07-24 13:51:23,624 + * com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure + * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344' + * has not been started. at + * org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230) + * This appears to be consistent in a MacBook. Haven't been able to replicate it + * on Windows though + */ +public class TransactionNotStartedErrorTest extends TestCase { + + private static final Log LOG = LogFactory.getLog(TransactionNotStartedErrorTest.class); + + private static int counter = 500; + + private static int hectorToHaloCtr; + private static int xenaToHaloCtr; + private static int troyToHaloCtr; + + private static int haloToHectorCtr; + private static int haloToXenaCtr; + private static int haloToTroyCtr; + + private String hectorToHalo = "hectorToHalo"; + private String xenaToHalo = "xenaToHalo"; + private String troyToHalo = "troyToHalo"; + + private String haloToHector = "haloToHector"; + private String haloToXena = "haloToXena"; + private String haloToTroy = "haloToTroy"; + + + private BrokerService broker; + + private Connection hectorConnection; + private Connection xenaConnection; + private Connection troyConnection; + private Connection haloConnection; + + private final Object lock = new Object(); + + public Connection createConnection() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + return factory.createConnection(); + } + + public Session createSession(Connection connection, boolean transacted) throws JMSException { + return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + } + + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.addConnector("tcp://localhost:61616").setName("Default"); + broker.start(); + LOG.info("Starting broker.."); + } + + public void tearDown() throws Exception { + hectorConnection.close(); + xenaConnection.close(); + troyConnection.close(); + haloConnection.close(); + broker.stop(); + } + + public void testTransactionNotStartedError() throws Exception { + startBroker(); + hectorConnection = createConnection(); + Thread hectorThread = buildProducer(hectorConnection, hectorToHalo); + Receiver hHectorReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToHectorCtr++; + if (haloToHectorCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver); + + troyConnection = createConnection(); + Thread troyThread = buildProducer(troyConnection, troyToHalo); + Receiver hTroyReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToTroyCtr++; + if (haloToTroyCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver); + + xenaConnection = createConnection(); + Thread xenaThread = buildProducer(xenaConnection, xenaToHalo); + Receiver hXenaReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToXenaCtr++; + if (haloToXenaCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver); + + haloConnection = createConnection(); + final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection); + final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection); + final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection); + Receiver hectorReceiver = new Receiver() { + public void receive(String s) throws Exception { + hectorToHaloCtr++; + troySender.send("halo to troy because of hector"); + if (hectorToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + Receiver xenaReceiver = new Receiver() { + public void receive(String s) throws Exception { + xenaToHaloCtr++; + hectorSender.send("halo to hector because of xena"); + if (xenaToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + Receiver troyReceiver = new Receiver() { + public void receive(String s) throws Exception { + troyToHaloCtr++; + xenaSender.send("halo to xena because of troy"); + if (troyToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver); + buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver); + buildReceiver(haloConnection, troyToHalo, true, troyReceiver); + + haloConnection.start(); + + troyConnection.start(); + troyThread.start(); + + xenaConnection.start(); + xenaThread.start(); + + hectorConnection.start(); + hectorThread.start(); + waitForMessagesToBeDelivered(); + // number of messages received should match messages sent + assertEquals(hectorToHaloCtr, counter); + LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages"); + assertEquals(xenaToHaloCtr, counter); + LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages"); + assertEquals(troyToHaloCtr, counter); + LOG.info("troyToHalo received " + troyToHaloCtr + " messages"); + assertEquals(haloToHectorCtr, counter); + LOG.info("haloToHector received " + haloToHectorCtr + " messages"); + assertEquals(haloToXenaCtr, counter); + LOG.info("haloToXena received " + haloToXenaCtr + " messages"); + assertEquals(haloToTroyCtr, counter); + LOG.info("haloToTroy received " + haloToTroyCtr + " messages"); + + } + + protected void waitForMessagesToBeDelivered() { + // let's give the listeners enough time to read all messages + long maxWaitTime = counter * 3000; + long waitTime = maxWaitTime; + long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); + + synchronized (lock) { + boolean hasMessages = true; + while (hasMessages && waitTime >= 0) { + try { + lock.wait(200); + } catch (InterruptedException e) { + LOG.error(e); + } + // check if all messages have been received + hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter + || haloToTroyCtr < counter; + waitTime = maxWaitTime - (System.currentTimeMillis() - start); + } + } + } + + public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception { + + return new MessageSender(queueName, connection, true, false); + } + + public Thread buildProducer(Connection connection, final String queueName) throws Exception { + + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageSender producer = new MessageSender(queueName, connection, false, false); + Thread thread = new Thread() { + + public synchronized void run() { + for (int i = 0; i < counter; i++) { + try { + producer.send(queueName); + if (session.getTransacted()) { + session.commit(); + } + + } catch (Exception e) { + throw new RuntimeException("on " + queueName + " send", e); + } + } + } + }; + return thread; + } + + public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver) throws Exception { + final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName)); + MessageListener messageListener = new MessageListener() { + + public void onMessage(Message message) { + try { + ObjectMessage objectMessage = (ObjectMessage)message; + String s = (String)objectMessage.getObject(); + receiver.receive(s); + if (session.getTransacted()) { + session.commit(); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + inputMessageConsumer.setMessageListener(messageListener); + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java index ab258a5137..0603ae9d49 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java @@ -40,7 +40,6 @@ public class AMQStoreDurableTopicTest extends SimpleDurableTopicTest { protected void setUp() throws Exception { numberofProducers=1; numberOfConsumers=1; - this.consumerSleepDuration=0; super.setUp(); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java index 81d0991e9f..48d84c6356 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java @@ -20,6 +20,9 @@ import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; /** * @version $Revision: 1.3 $ @@ -28,12 +31,22 @@ public class SimpleDurableTopicTest extends SimpleTopicTest { protected void setUp() throws Exception { numberOfDestinations=1; - numberOfConsumers = 1; + numberOfConsumers = 4; numberofProducers = 1; sampleCount=1000; playloadSize = 1024; super.setUp(); } + + protected void configureBroker(BrokerService answer,String uri) throws Exception { + AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory(); + persistenceFactory.setMaxFileLength(1024*16); + answer.setPersistenceFactory(persistenceFactory); + answer.setDeleteAllMessagesOnStartup(true); + answer.addConnector(uri); + answer.setUseShutdownHook(false); + } + protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte payload[]) throws JMSException { PerfProducer pp = new PerfProducer(fac, dest, payload); pp.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -42,7 +55,13 @@ public class SimpleDurableTopicTest extends SimpleTopicTest { protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { PerfConsumer result = new PerfConsumer(fac, dest, "subs:" + number); - result.setInitialDelay(20000); + result.setInitialDelay(2000); + return result; + } + + protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception { + ActiveMQConnectionFactory result = super.createConnectionFactory(uri); + result.setSendAcksAsync(false); return result; } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java index 778c9687c2..a14e527a6a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java @@ -65,7 +65,6 @@ public class SimpleNetworkTest extends SimpleTopicTest { LOG.info("Testing against destination: " + destination); for (int i = 0; i < numberOfConsumers; i++) { consumers[i] = createConsumer(consumerFactory, destination, i); - consumers[i].setSleepDuration(consumerSleepDuration); consumers[i].start(); } for (int i = 0; i < numberofProducers; i++) { diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java index 9836e95e46..3247f51d93 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java @@ -18,18 +18,13 @@ package org.apache.activemq.perf; import java.util.ArrayList; import java.util.List; - import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; - import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; -import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; /** * @version $Revision: 1.3 $ @@ -37,18 +32,24 @@ import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStorag public class SimpleNonPersistentQueueTest extends SimpleQueueTest { protected void setUp() throws Exception { - numberOfConsumers = 10; - numberofProducers = 10; - //this.consumerSleepDuration=100; + numberOfConsumers = 1; + numberofProducers = 1; super.setUp(); } protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException { PerfProducer pp = new PerfProducer(fac, dest, payload); pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - //pp.setTimeToLive(100); + pp.setTimeToLive(100); return pp; } + protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { + PerfConsumer result = new PerfConsumer(fac, dest); + result.setInitialDelay(20*1000); + return result; + } + + /* protected void configureBroker(BrokerService answer,String uri) throws Exception { answer.setPersistent(false); final List policyEntries = new ArrayList(); @@ -65,4 +66,5 @@ public class SimpleNonPersistentQueueTest extends SimpleQueueTest { answer.setDestinationPolicy(policyMap); super.configureBroker(answer, uri); } + */ } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java index 35e60cb171..ab5ddd238b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java @@ -55,9 +55,7 @@ public class SimpleTopicTest extends TestCase { protected byte[] array; protected ConnectionFactory factory; - protected long consumerSleepDuration=0; - - /** + /** * Sets up a test where the producer and consumer have their own connection. * * @see junit.framework.TestCase#setUp() @@ -84,7 +82,6 @@ public class SimpleTopicTest extends TestCase { LOG.info("Testing against destination: " + destination); for (int i = 0; i < numberOfConsumers; i++) { consumers[consumerCount] = createConsumer(factory, destination, consumerCount); - consumers[consumerCount].setSleepDuration(consumerSleepDuration); consumerCount++; } for (int i = 0; i < numberofProducers; i++) { diff --git a/activemq-core/src/test/resources/log4j.properties b/activemq-core/src/test/resources/log4j.properties index 3f12e671d1..d95fe6f509 100755 --- a/activemq-core/src/test/resources/log4j.properties +++ b/activemq-core/src/test/resources/log4j.properties @@ -18,7 +18,7 @@ # # The logging properties used during tests.. # -log4j.rootLogger=INFO, out +log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.activemq.spring=WARN