git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@667105 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-06-12 14:31:27 +00:00
parent 4a052d20b1
commit 6f69f1914a
16 changed files with 716 additions and 364 deletions

View File

@ -1226,7 +1226,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException();
} else {
try {
throw JMSExceptionSupport.create(er.getException());
}catch(Throwable e) {
LOG.error("Caught an exception trying to create a JMSException",e);
}
}
}
return response;

View File

@ -71,6 +71,7 @@ public class AsyncDataManager {
public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
public static final String DEFAULT_FILE_PREFIX = "data-";
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
private static final Log LOG = LogFactory.getLog(AsyncDataManager.class);
@ -188,7 +189,7 @@ public class AsyncDataManager {
cleanup();
}
};
Scheduler.executePeriodically(cleanupTask, 1000 * 30);
Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
}
public void lock() throws IOException {
@ -272,6 +273,7 @@ public class AsyncDataManager {
if (currentWriteFile != null) {
currentWriteFile.linkAfter(nextWriteFile);
if (currentWriteFile.isUnused()) {
System.err.println("remove current file unused:" + currentWriteFile);
removeDataFile(currentWriteFile);
}
}
@ -298,7 +300,7 @@ public class AsyncDataManager {
DataFile dataFile = fileMap.get(key);
if (dataFile == null) {
LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file " + filePrefix + "-" + item.getDataFileId());
throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
}
return dataFile;
}
@ -308,7 +310,7 @@ public class AsyncDataManager {
DataFile dataFile = fileMap.get(key);
if (dataFile == null) {
LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file " + filePrefix + "-" + item.getDataFileId());
throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
}
return dataFile.getFile();
}
@ -411,7 +413,9 @@ public class AsyncDataManager {
purgeList.add(dataFile);
}
for (DataFile dataFile : purgeList) {
forceRemoveDataFile(dataFile);
if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
forceRemoveDataFile(dataFile);
}
}
}
@ -463,11 +467,11 @@ public class AsyncDataManager {
dataFile.unlink();
if (archiveDataLogs) {
dataFile.move(getDirectoryArchive());
LOG.debug("moced data file " + dataFile + " to "
LOG.debug("moved data file " + dataFile + " to "
+ getDirectoryArchive());
} else {
boolean result = dataFile.delete();
LOG.debug("discarding data file " + dataFile
LOG.info("discarding data file " + dataFile
+ (result ? "successful " : "failed"));
}
}

View File

@ -113,6 +113,7 @@ public class AMQMessageStore implements MessageStore {
if (debug) {
LOG.debug("Journalled message add for: " + id + ", at: " + location);
}
this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
addMessage(message, location);
} else {
if (debug) {
@ -164,7 +165,6 @@ public class AMQMessageStore implements MessageStore {
try {
lastLocation = location;
messages.put(message.getMessageId(), data);
this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
}finally {
lock.unlock();
}

View File

@ -90,13 +90,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private static final boolean BROKEN_FILE_LOCK;
private static final boolean DISABLE_LOCKING;
private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
private AsyncDataManager asyncDataManager;
private ReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
private long cleanupInterval = 1000 * 30;
private long checkpointInterval = 1000 * 60;
private int maxCheckpointMessageAddSize = 1024 * 4;
private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
@ -116,6 +114,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private boolean persistentIndex=true;
private boolean useNio = true;
private boolean archiveDataLogs=false;
private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
@ -425,8 +424,12 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
inProgress.add(lastDataFile);
Set<Integer> inUse = new HashSet<Integer>(referenceStoreAdapter.getReferenceFileIdsInUse());
asyncDataManager.consolidateDataFilesNotIn(inUse, inProgress);
inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
Location lastActiveTx = transactionStore.checkpoint();
if (lastActiveTx != null) {
lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
}
asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
} catch (IOException e) {
LOG.error("Could not cleanup data files: " + e, e);
}

View File

@ -44,6 +44,7 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
private boolean persistentIndex=true;
private boolean useNio = true;
private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
/**
@ -60,9 +61,18 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
result.setReferenceStoreAdapter(getReferenceStoreAdapter());
result.setUseNio(isUseNio());
result.setMaxFileLength(getMaxFileLength());
result.setCleanupInterval(getCleanupInterval());
return result;
}
public long getCleanupInterval() {
return cleanupInterval;
}
public void setCleanupInterval(long val) {
cleanupInterval = val;
}
/**
* @return the dataDirectory
*/

View File

@ -18,9 +18,12 @@
package org.apache.activemq.store.amq;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
@ -230,13 +233,13 @@ public class AMQTransactionStore implements TransactionStore {
// But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not
// roll over active tx records.
Location rc = null;
Location minimumLocationInUse = null;
synchronized (inflightTransactions) {
for (Iterator<AMQTx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
AMQTx tx = iter.next();
Location location = tx.getLocation();
if (rc == null || rc.compareTo(location) < 0) {
rc = location;
if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
minimumLocationInUse = location;
}
}
}
@ -244,11 +247,11 @@ public class AMQTransactionStore implements TransactionStore {
for (Iterator<AMQTx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
AMQTx tx = iter.next();
Location location = tx.getLocation();
if (rc == null || rc.compareTo(location) < 0) {
rc = location;
if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
minimumLocationInUse = location;
}
}
return rc;
return minimumLocationInUse;
}
}

View File

@ -234,8 +234,8 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
* @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
*/
public Set<Integer> getReferenceFileIdsInUse() throws IOException {
return recordReferences.keySet();
public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException {
return new HashSet<Integer>(recordReferences.keySet());
}
/**

View File

@ -1,41 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
public class MessageSender {
private MessageProducer producer;
private Session session;
public MessageSender(String queueName, Connection connection, boolean useTransactedSession) throws Exception {
session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createQueue(queueName));
}
public void send(String payload) throws Exception {
ObjectMessage message = session.createObjectMessage();
message.setObject(payload);
producer.send(message);
if (session.getTransacted()) {
session.commit();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
public class MessageSender {
private MessageProducer producer;
private Session session;
public MessageSender(String queueName, Connection connection, boolean useTransactedSession, boolean topic) throws Exception {
session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(topic ? session.createTopic(queueName) : session.createQueue(queueName));
}
public void send(String payload) throws Exception {
ObjectMessage message = session.createObjectMessage();
message.setObject(payload);
producer.send(message);
if (session.getTransacted()) {
session.commit();
}
}
}

View File

@ -0,0 +1,312 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/*
* Try and replicate:
* Caused by: java.io.IOException: Could not locate data file data--188
* at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:302)
* at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:614)
* at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:523)
*/
public class MissingDataFileTest extends TestCase {
private static final Log LOG = LogFactory.getLog(MissingDataFileTest.class);
private static int counter = 300;
private static int hectorToHaloCtr;
private static int xenaToHaloCtr;
private static int troyToHaloCtr;
private static int haloToHectorCtr;
private static int haloToXenaCtr;
private static int haloToTroyCtr;
private String hectorToHalo = "hectorToHalo";
private String xenaToHalo = "xenaToHalo";
private String troyToHalo = "troyToHalo";
private String haloToHector = "haloToHector";
private String haloToXena = "haloToXena";
private String haloToTroy = "haloToTroy";
private BrokerService broker;
private Connection hectorConnection;
private Connection xenaConnection;
private Connection troyConnection;
private Connection haloConnection;
private final Object lock = new Object();
final boolean useTopic = false;
final boolean useSleep = true;
protected static final String payload = new String(new byte[500]);
public Connection createConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
return factory.createConnection();
}
public Session createSession(Connection connection, boolean transacted) throws JMSException {
return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
}
public void startBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616").setName("Default");
SystemUsage systemUsage;
systemUsage = new SystemUsage();
systemUsage.getMemoryUsage().setLimit(1024 * 10); // Just a few messags
broker.setSystemUsage(systemUsage);
AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
factory.setMaxFileLength(2*1024); // ~4 messages
factory.setCleanupInterval(5000); // every few second
broker.start();
LOG.info("Starting broker..");
}
public void tearDown() throws Exception {
hectorConnection.close();
xenaConnection.close();
troyConnection.close();
haloConnection.close();
broker.stop();
}
public void testForNoDataFoundError() throws Exception {
startBroker();
hectorConnection = createConnection();
Thread hectorThread = buildProducer(hectorConnection, hectorToHalo, false, useTopic);
Receiver hHectorReceiver = new Receiver() {
public void receive(String s) throws Exception {
haloToHectorCtr++;
if (haloToHectorCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
possiblySleep(haloToHectorCtr);
}
};
buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver, useTopic);
troyConnection = createConnection();
Thread troyThread = buildProducer(troyConnection, troyToHalo);
Receiver hTroyReceiver = new Receiver() {
public void receive(String s) throws Exception {
haloToTroyCtr++;
if (haloToTroyCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
possiblySleep(haloToTroyCtr);
}
};
buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver, false);
xenaConnection = createConnection();
Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
Receiver hXenaReceiver = new Receiver() {
public void receive(String s) throws Exception {
haloToXenaCtr++;
if (haloToXenaCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
possiblySleep(haloToXenaCtr);
}
};
buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver, false);
haloConnection = createConnection();
final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection, false);
final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection, false);
final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection, false);
Receiver hectorReceiver = new Receiver() {
public void receive(String s) throws Exception {
hectorToHaloCtr++;
troySender.send(payload);
if (hectorToHaloCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
possiblySleep(hectorToHaloCtr);
}
}
};
Receiver xenaReceiver = new Receiver() {
public void receive(String s) throws Exception {
xenaToHaloCtr++;
hectorSender.send(payload);
if (xenaToHaloCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
possiblySleep(xenaToHaloCtr);
}
};
Receiver troyReceiver = new Receiver() {
public void receive(String s) throws Exception {
troyToHaloCtr++;
xenaSender.send(payload);
if (troyToHaloCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver, false);
buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver, false);
buildReceiver(haloConnection, troyToHalo, true, troyReceiver, false);
haloConnection.start();
troyConnection.start();
troyThread.start();
xenaConnection.start();
xenaThread.start();
hectorConnection.start();
hectorThread.start();
waitForMessagesToBeDelivered();
// number of messages received should match messages sent
assertEquals(hectorToHaloCtr, counter);
LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
assertEquals(xenaToHaloCtr, counter);
LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
assertEquals(troyToHaloCtr, counter);
LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
assertEquals(haloToHectorCtr, counter);
LOG.info("haloToHector received " + haloToHectorCtr + " messages");
assertEquals(haloToXenaCtr, counter);
LOG.info("haloToXena received " + haloToXenaCtr + " messages");
assertEquals(haloToTroyCtr, counter);
LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
}
protected void possiblySleep(int count) throws InterruptedException {
if (useSleep) {
if (count % 100 == 0) {
Thread.sleep(5000);
}
}
}
protected void waitForMessagesToBeDelivered() {
// let's give the listeners enough time to read all messages
long maxWaitTime = counter * 1000;
long waitTime = maxWaitTime;
long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
synchronized (lock) {
boolean hasMessages = true;
while (hasMessages && waitTime >= 0) {
try {
lock.wait(200);
} catch (InterruptedException e) {
LOG.error(e);
}
// check if all messages have been received
hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter
|| haloToTroyCtr < counter;
waitTime = maxWaitTime - (System.currentTimeMillis() - start);
}
}
}
public MessageSender buildTransactionalProducer(String queueName, Connection connection, boolean isTopic) throws Exception {
return new MessageSender(queueName, connection, true, isTopic);
}
public Thread buildProducer(Connection connection, final String queueName) throws Exception {
return buildProducer(connection, queueName, false, false);
}
public Thread buildProducer(Connection connection, final String queueName, boolean transacted, boolean isTopic) throws Exception {
final MessageSender producer = new MessageSender(queueName, connection, transacted, isTopic);
Thread thread = new Thread() {
public synchronized void run() {
for (int i = 0; i < counter; i++) {
try {
producer.send(payload );
} catch (Exception e) {
throw new RuntimeException("on " + queueName + " send", e);
}
}
}
};
return thread;
}
public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception {
final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName));
MessageListener messageListener = new MessageListener() {
public void onMessage(Message message) {
try {
ObjectMessage objectMessage = (ObjectMessage)message;
String s = (String)objectMessage.getObject();
receiver.receive(s);
if (session.getTransacted()) {
session.commit();
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
inputMessageConsumer.setMessageListener(messageListener);
}
}

View File

@ -1,286 +1,286 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/*
* simulate message flow which cause the following exception in the broker
* (exception logged by client) <p/> 2007-07-24 13:51:23,624
* com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure
* javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344'
* has not been started. at
* org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230)
* This appears to be consistent in a MacBook. Haven't been able to replicate it
* on Windows though
*/
public class TransactionNotStartedErrorTest extends TestCase {
private static final Log LOG = LogFactory.getLog(TransactionNotStartedErrorTest.class);
private static int counter = 500;
private static int hectorToHaloCtr;
private static int xenaToHaloCtr;
private static int troyToHaloCtr;
private static int haloToHectorCtr;
private static int haloToXenaCtr;
private static int haloToTroyCtr;
private String hectorToHalo = "hectorToHalo";
private String xenaToHalo = "xenaToHalo";
private String troyToHalo = "troyToHalo";
private String haloToHector = "haloToHector";
private String haloToXena = "haloToXena";
private String haloToTroy = "haloToTroy";
private BrokerService broker;
private Connection hectorConnection;
private Connection xenaConnection;
private Connection troyConnection;
private Connection haloConnection;
private final Object lock = new Object();
public Connection createConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
return factory.createConnection();
}
public Session createSession(Connection connection, boolean transacted) throws JMSException {
return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
}
public void startBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616").setName("Default");
broker.start();
LOG.info("Starting broker..");
}
public void tearDown() throws Exception {
hectorConnection.close();
xenaConnection.close();
troyConnection.close();
haloConnection.close();
broker.stop();
}
public void testTransactionNotStartedError() throws Exception {
startBroker();
hectorConnection = createConnection();
Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
Receiver hHectorReceiver = new Receiver() {
public void receive(String s) throws Exception {
haloToHectorCtr++;
if (haloToHectorCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);
troyConnection = createConnection();
Thread troyThread = buildProducer(troyConnection, troyToHalo);
Receiver hTroyReceiver = new Receiver() {
public void receive(String s) throws Exception {
haloToTroyCtr++;
if (haloToTroyCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);
xenaConnection = createConnection();
Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
Receiver hXenaReceiver = new Receiver() {
public void receive(String s) throws Exception {
haloToXenaCtr++;
if (haloToXenaCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);
haloConnection = createConnection();
final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection);
final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection);
final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection);
Receiver hectorReceiver = new Receiver() {
public void receive(String s) throws Exception {
hectorToHaloCtr++;
troySender.send("halo to troy because of hector");
if (hectorToHaloCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
Receiver xenaReceiver = new Receiver() {
public void receive(String s) throws Exception {
xenaToHaloCtr++;
hectorSender.send("halo to hector because of xena");
if (xenaToHaloCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
Receiver troyReceiver = new Receiver() {
public void receive(String s) throws Exception {
troyToHaloCtr++;
xenaSender.send("halo to xena because of troy");
if (troyToHaloCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
buildReceiver(haloConnection, troyToHalo, true, troyReceiver);
haloConnection.start();
troyConnection.start();
troyThread.start();
xenaConnection.start();
xenaThread.start();
hectorConnection.start();
hectorThread.start();
waitForMessagesToBeDelivered();
// number of messages received should match messages sent
assertEquals(hectorToHaloCtr, counter);
LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
assertEquals(xenaToHaloCtr, counter);
LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
assertEquals(troyToHaloCtr, counter);
LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
assertEquals(haloToHectorCtr, counter);
LOG.info("haloToHector received " + haloToHectorCtr + " messages");
assertEquals(haloToXenaCtr, counter);
LOG.info("haloToXena received " + haloToXenaCtr + " messages");
assertEquals(haloToTroyCtr, counter);
LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
}
protected void waitForMessagesToBeDelivered() {
// let's give the listeners enough time to read all messages
long maxWaitTime = counter * 3000;
long waitTime = maxWaitTime;
long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
synchronized (lock) {
boolean hasMessages = true;
while (hasMessages && waitTime >= 0) {
try {
lock.wait(200);
} catch (InterruptedException e) {
LOG.error(e);
}
// check if all messages have been received
hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter
|| haloToTroyCtr < counter;
waitTime = maxWaitTime - (System.currentTimeMillis() - start);
}
}
}
public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception {
return new MessageSender(queueName, connection, true);
}
public Thread buildProducer(Connection connection, final String queueName) throws Exception {
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageSender producer = new MessageSender(queueName, connection, false);
Thread thread = new Thread() {
public synchronized void run() {
for (int i = 0; i < counter; i++) {
try {
producer.send(queueName);
if (session.getTransacted()) {
session.commit();
}
} catch (Exception e) {
throw new RuntimeException("on " + queueName + " send", e);
}
}
}
};
return thread;
}
public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver) throws Exception {
final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName));
MessageListener messageListener = new MessageListener() {
public void onMessage(Message message) {
try {
ObjectMessage objectMessage = (ObjectMessage)message;
String s = (String)objectMessage.getObject();
receiver.receive(s);
if (session.getTransacted()) {
session.commit();
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
inputMessageConsumer.setMessageListener(messageListener);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/*
* simulate message flow which cause the following exception in the broker
* (exception logged by client) <p/> 2007-07-24 13:51:23,624
* com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure
* javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344'
* has not been started. at
* org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230)
* This appears to be consistent in a MacBook. Haven't been able to replicate it
* on Windows though
*/
public class TransactionNotStartedErrorTest extends TestCase {
private static final Log LOG = LogFactory.getLog(TransactionNotStartedErrorTest.class);
private static int counter = 500;
private static int hectorToHaloCtr;
private static int xenaToHaloCtr;
private static int troyToHaloCtr;
private static int haloToHectorCtr;
private static int haloToXenaCtr;
private static int haloToTroyCtr;
private String hectorToHalo = "hectorToHalo";
private String xenaToHalo = "xenaToHalo";
private String troyToHalo = "troyToHalo";
private String haloToHector = "haloToHector";
private String haloToXena = "haloToXena";
private String haloToTroy = "haloToTroy";
private BrokerService broker;
private Connection hectorConnection;
private Connection xenaConnection;
private Connection troyConnection;
private Connection haloConnection;
private final Object lock = new Object();
public Connection createConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
return factory.createConnection();
}
public Session createSession(Connection connection, boolean transacted) throws JMSException {
return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
}
public void startBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616").setName("Default");
broker.start();
LOG.info("Starting broker..");
}
public void tearDown() throws Exception {
hectorConnection.close();
xenaConnection.close();
troyConnection.close();
haloConnection.close();
broker.stop();
}
public void testTransactionNotStartedError() throws Exception {
startBroker();
hectorConnection = createConnection();
Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
Receiver hHectorReceiver = new Receiver() {
public void receive(String s) throws Exception {
haloToHectorCtr++;
if (haloToHectorCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);
troyConnection = createConnection();
Thread troyThread = buildProducer(troyConnection, troyToHalo);
Receiver hTroyReceiver = new Receiver() {
public void receive(String s) throws Exception {
haloToTroyCtr++;
if (haloToTroyCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);
xenaConnection = createConnection();
Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
Receiver hXenaReceiver = new Receiver() {
public void receive(String s) throws Exception {
haloToXenaCtr++;
if (haloToXenaCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);
haloConnection = createConnection();
final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection);
final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection);
final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection);
Receiver hectorReceiver = new Receiver() {
public void receive(String s) throws Exception {
hectorToHaloCtr++;
troySender.send("halo to troy because of hector");
if (hectorToHaloCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
Receiver xenaReceiver = new Receiver() {
public void receive(String s) throws Exception {
xenaToHaloCtr++;
hectorSender.send("halo to hector because of xena");
if (xenaToHaloCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
Receiver troyReceiver = new Receiver() {
public void receive(String s) throws Exception {
troyToHaloCtr++;
xenaSender.send("halo to xena because of troy");
if (troyToHaloCtr >= counter) {
synchronized (lock) {
lock.notifyAll();
}
}
}
};
buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
buildReceiver(haloConnection, troyToHalo, true, troyReceiver);
haloConnection.start();
troyConnection.start();
troyThread.start();
xenaConnection.start();
xenaThread.start();
hectorConnection.start();
hectorThread.start();
waitForMessagesToBeDelivered();
// number of messages received should match messages sent
assertEquals(hectorToHaloCtr, counter);
LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
assertEquals(xenaToHaloCtr, counter);
LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
assertEquals(troyToHaloCtr, counter);
LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
assertEquals(haloToHectorCtr, counter);
LOG.info("haloToHector received " + haloToHectorCtr + " messages");
assertEquals(haloToXenaCtr, counter);
LOG.info("haloToXena received " + haloToXenaCtr + " messages");
assertEquals(haloToTroyCtr, counter);
LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
}
protected void waitForMessagesToBeDelivered() {
// let's give the listeners enough time to read all messages
long maxWaitTime = counter * 3000;
long waitTime = maxWaitTime;
long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
synchronized (lock) {
boolean hasMessages = true;
while (hasMessages && waitTime >= 0) {
try {
lock.wait(200);
} catch (InterruptedException e) {
LOG.error(e);
}
// check if all messages have been received
hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter
|| haloToTroyCtr < counter;
waitTime = maxWaitTime - (System.currentTimeMillis() - start);
}
}
}
public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception {
return new MessageSender(queueName, connection, true, false);
}
public Thread buildProducer(Connection connection, final String queueName) throws Exception {
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageSender producer = new MessageSender(queueName, connection, false, false);
Thread thread = new Thread() {
public synchronized void run() {
for (int i = 0; i < counter; i++) {
try {
producer.send(queueName);
if (session.getTransacted()) {
session.commit();
}
} catch (Exception e) {
throw new RuntimeException("on " + queueName + " send", e);
}
}
}
};
return thread;
}
public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver) throws Exception {
final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName));
MessageListener messageListener = new MessageListener() {
public void onMessage(Message message) {
try {
ObjectMessage objectMessage = (ObjectMessage)message;
String s = (String)objectMessage.getObject();
receiver.receive(s);
if (session.getTransacted()) {
session.commit();
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
inputMessageConsumer.setMessageListener(messageListener);
}
}

View File

@ -40,7 +40,6 @@ public class AMQStoreDurableTopicTest extends SimpleDurableTopicTest {
protected void setUp() throws Exception {
numberofProducers=1;
numberOfConsumers=1;
this.consumerSleepDuration=0;
super.setUp();
}
}

View File

@ -20,6 +20,9 @@ import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
/**
* @version $Revision: 1.3 $
@ -28,12 +31,22 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
protected void setUp() throws Exception {
numberOfDestinations=1;
numberOfConsumers = 1;
numberOfConsumers = 4;
numberofProducers = 1;
sampleCount=1000;
playloadSize = 1024;
super.setUp();
}
protected void configureBroker(BrokerService answer,String uri) throws Exception {
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
persistenceFactory.setMaxFileLength(1024*16);
answer.setPersistenceFactory(persistenceFactory);
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector(uri);
answer.setUseShutdownHook(false);
}
protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte payload[]) throws JMSException {
PerfProducer pp = new PerfProducer(fac, dest, payload);
pp.setDeliveryMode(DeliveryMode.PERSISTENT);
@ -42,7 +55,13 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
PerfConsumer result = new PerfConsumer(fac, dest, "subs:" + number);
result.setInitialDelay(20000);
result.setInitialDelay(2000);
return result;
}
protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception {
ActiveMQConnectionFactory result = super.createConnectionFactory(uri);
result.setSendAcksAsync(false);
return result;
}

View File

@ -65,7 +65,6 @@ public class SimpleNetworkTest extends SimpleTopicTest {
LOG.info("Testing against destination: " + destination);
for (int i = 0; i < numberOfConsumers; i++) {
consumers[i] = createConsumer(consumerFactory, destination, i);
consumers[i].setSleepDuration(consumerSleepDuration);
consumers[i].start();
}
for (int i = 0; i < numberofProducers; i++) {

View File

@ -18,18 +18,13 @@ package org.apache.activemq.perf;
import java.util.ArrayList;
import java.util.List;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
/**
* @version $Revision: 1.3 $
@ -37,18 +32,24 @@ import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStorag
public class SimpleNonPersistentQueueTest extends SimpleQueueTest {
protected void setUp() throws Exception {
numberOfConsumers = 10;
numberofProducers = 10;
//this.consumerSleepDuration=100;
numberOfConsumers = 1;
numberofProducers = 1;
super.setUp();
}
protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException {
PerfProducer pp = new PerfProducer(fac, dest, payload);
pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//pp.setTimeToLive(100);
pp.setTimeToLive(100);
return pp;
}
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
PerfConsumer result = new PerfConsumer(fac, dest);
result.setInitialDelay(20*1000);
return result;
}
/*
protected void configureBroker(BrokerService answer,String uri) throws Exception {
answer.setPersistent(false);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
@ -65,4 +66,5 @@ public class SimpleNonPersistentQueueTest extends SimpleQueueTest {
answer.setDestinationPolicy(policyMap);
super.configureBroker(answer, uri);
}
*/
}

View File

@ -55,9 +55,7 @@ public class SimpleTopicTest extends TestCase {
protected byte[] array;
protected ConnectionFactory factory;
protected long consumerSleepDuration=0;
/**
/**
* Sets up a test where the producer and consumer have their own connection.
*
* @see junit.framework.TestCase#setUp()
@ -84,7 +82,6 @@ public class SimpleTopicTest extends TestCase {
LOG.info("Testing against destination: " + destination);
for (int i = 0; i < numberOfConsumers; i++) {
consumers[consumerCount] = createConsumer(factory, destination, consumerCount);
consumers[consumerCount].setSleepDuration(consumerSleepDuration);
consumerCount++;
}
for (int i = 0; i < numberofProducers; i++) {

View File

@ -18,7 +18,7 @@
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.spring=WARN