remove some of the dependencies on port 61616 in current tests.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1153649 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-08-03 20:31:53 +00:00
parent a6b85cc9f3
commit 42e51a3368
23 changed files with 579 additions and 570 deletions

View File

@ -27,8 +27,8 @@ import javax.jms.Destination;
/**
* A useful base class which creates and closes an embedded broker
*
*
*
*
*/
public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
@ -39,7 +39,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
protected boolean useTopic;
protected ActiveMQDestination destination;
protected JmsTemplate template;
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
@ -64,7 +64,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
/**
* Factory method to create a new {@link JmsTemplate}
*
*
* @return a newly created JmsTemplate
*/
protected JmsTemplate createJmsTemplate() {
@ -73,7 +73,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
/**
* Factory method to create a new {@link Destination}
*
*
* @return newly created Destinaiton
*/
protected ActiveMQDestination createDestination() {
@ -101,7 +101,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
/**
* Factory method to create a new {@link ConnectionFactory} instance
*
*
* @return a newly created connection factory
*/
protected ConnectionFactory createConnectionFactory() throws Exception {
@ -110,7 +110,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
/**
* Factory method to create a new broker
*
*
* @throws Exception
*/
protected BrokerService createBroker() throws Exception {

View File

@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@ -61,9 +62,9 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
// Msg3 will cause the test to fail as it will attempt to retrieve an additional ServerSession from
// an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the PrefetchSubscription
producer.send(session.createTextMessage("Msg3"));
session.commit();
// wait for test to complete and the test result to get set
// this happens asynchronously since the messages are delivered asynchronously
synchronized (testMutex) {
@ -71,13 +72,18 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
testMutex.wait();
}
}
//test completed, result is ready
assertTrue("Attempted to retrieve more than one ServerSession at a time", testMutex.testSuccessful);
}
@Override
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
}
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:61616";
bindAddress = "tcp://localhost:0";
super.setUp();
testMutex = new TestMutex();
@ -105,7 +111,7 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
answer.setDestinationPolicy(policyMap);
return answer;
}
protected Queue createQueue() {
return new ActiveMQQueue(getDestinationString());
}

View File

@ -17,18 +17,14 @@
package org.apache.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*
*/
public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ReconnectWithSameClientIDTest.class);
@ -59,8 +55,13 @@ public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport {
useConnection(connection);
}
@Override
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
}
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:61616";
bindAddress = "tcp://localhost:0";
super.setUp();
}
@ -75,9 +76,5 @@ public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport {
protected void useConnection(Connection connection) throws JMSException {
connection.setClientID("foo");
connection.start();
/**
* Session session = connection.createSession(transacted, authMode);
* return session;
*/
}
}

View File

@ -29,19 +29,15 @@ import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
private static final transient Logger LOG = LoggerFactory.getLogger(MasterSlaveTempQueueMemoryTest.class);
String masterBindAddress = "tcp://localhost:61616";
String slaveBindAddress = "tcp://localhost:62616";
BrokerService slave;
/*
@ -51,17 +47,16 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
@Override
protected BrokerService createBroker() throws Exception {
// bindAddress is used by super.createBroker
bindAddress = masterBindAddress;
bindAddress = "tcp://localhost:0";
BrokerService master = super.createBroker();
master.setBrokerName("master");
configureBroker(master);
bindAddress = slaveBindAddress;
slave = super.createBroker();
slave.setBrokerName("slave");
slave.setMasterConnectorURI(masterBindAddress);
slave.setMasterConnectorURI(master.getTransportConnectors().get(0).getPublishableConnectString());
configureBroker(slave);
bindAddress = masterBindAddress;
bindAddress = master.getTransportConnectors().get(0).getPublishableConnectString();
return master;
}
@ -74,15 +69,15 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
// optimized dispatch does not effect the determinism of inflight between
// master and slave in this test
//broker.setDestinationPolicy(policyMap);
}
@Override
protected void startBroker() throws Exception {
// because master will wait for slave to connect it needs
// because master will wait for slave to connect it needs
// to be in a separate thread
Thread starterThread = new Thread() {
Thread starterThread = new Thread() {
public void run() {
try {
broker.setWaitForSlave(true);
@ -94,7 +89,7 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
}
};
starterThread.start();
slave.start();
starterThread.join(60*1000);
assertTrue("slave is indeed a slave", slave.isSlave());
@ -104,7 +99,7 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
protected void tearDown() throws Exception {
slave.stop();
super.tearDown();
}
@Override
@ -112,25 +107,25 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
super.testLoadRequestReply();
Thread.sleep(2000);
// some checks on the slave
AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor(
AdvisoryBroker.class);
assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size());
RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor(
RegionBroker.class);
//serverDestination +
assertEquals(6, rb.getDestinationMap().size());
RegionBroker.class);
//serverDestination +
assertEquals(6, rb.getDestinationMap().size());
RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
LOG.info("enqueues " + rb.getDestinationStatistics().getEnqueues().getCount());
assertEquals("enqueues match", rb.getDestinationStatistics().getEnqueues().getCount(), masterRb.getDestinationStatistics().getEnqueues().getCount());
LOG.info("dequeues " + rb.getDestinationStatistics().getDequeues().getCount());
assertEquals("dequeues match",
rb.getDestinationStatistics().getDequeues().getCount(),
@ -145,24 +140,24 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
// slave does not actually dispatch any messages, so no request/reply(2) pair per iteration(COUNT)
// slave estimate must be >= actual master value
// master does not always reach expected total, should be assertEquals.., why?
assertTrue("dispatched to slave is as good as master, master="
assertTrue("dispatched to slave is as good as master, master="
+ masterRb.getDestinationStatistics().getDispatched().getCount(),
rb.getDestinationStatistics().getDispatched().getCount() + 2*messagesToSend >=
rb.getDestinationStatistics().getDispatched().getCount() + 2*messagesToSend >=
masterRb.getDestinationStatistics().getDispatched().getCount());
}
public void testMoreThanPageSizeUnacked() throws Exception {
final int messageCount = Queue.MAX_PAGE_SIZE + 10;
final CountDownLatch latch = new CountDownLatch(1);
serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQSession s = (ActiveMQSession) serverSession;
s.setSessionAsyncDispatch(true);
MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination);
serverConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
latch.await(30L, TimeUnit.SECONDS);
@ -171,41 +166,41 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
}
}
});
MessageProducer producer = clientSession.createProducer(serverDestination);
for (int i =0; i< messageCount; i++) {
Message msg = clientSession.createMessage();
producer.send(msg);
}
Thread.sleep(5000);
RegionBroker slaveRb = (RegionBroker) slave.getBroker().getAdaptor(
RegionBroker.class);
RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
assertEquals("inflight match expected", messageCount, masterRb.getDestinationStatistics().getInflight().getCount());
assertEquals("inflight match expected", messageCount, masterRb.getDestinationStatistics().getInflight().getCount());
assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount());
latch.countDown();
Thread.sleep(5000);
assertEquals("inflight match expected", 0, masterRb.getDestinationStatistics().getInflight().getCount());
assertEquals("inflight match expected", 0, masterRb.getDestinationStatistics().getInflight().getCount());
assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount());
}
public void testLoadRequestReplyWithNoTempQueueDelete() throws Exception {
deleteTempQueue = false;
messagesToSend = 10;
testLoadRequestReply();
}
public void testLoadRequestReplyWithTransactions() throws Exception {
serverTransactional = clientTransactional = true;
messagesToSend = 100;
reInitialiseSessions();
testLoadRequestReply();
}
public void testConcurrentConsumerLoadRequestReplyWithTransactions() throws Exception {
serverTransactional = true;
numConsumers = numProducers = 10;
@ -215,10 +210,10 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
}
protected void reInitialiseSessions() throws Exception {
// reinitialize so they can respect the transactional flags
// reinitialize so they can respect the transactional flags
serverSession.close();
clientSession.close();
serverSession = serverConnection.createSession(serverTransactional,
serverSession = serverConnection.createSession(serverTransactional,
serverTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
clientSession = clientConnection.createSession(clientTransactional,
clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);

View File

@ -25,6 +25,7 @@ import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -48,7 +49,6 @@ import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
@ -90,8 +90,9 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
public void testConnectors() throws Exception{
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
assertEquals("openwire URL port doesn't equal bind Address",
new URI(broker.getOpenWireURL()).getPort(),
new URI(this.broker.getTransportConnectors().get(0).getPublishableConnectString()).getPort());
}
public void testMBeans() throws Exception {
@ -317,10 +318,9 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
String newDestination = getSecondDestinationString();
long queueSize = queue.getQueueSize();
assertTrue(queueSize > 0);
queue.copyMatchingMessagesTo("counter > 2", newDestination);
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
@ -334,6 +334,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
@SuppressWarnings("rawtypes")
protected void assertSendViaMBean() throws Exception {
String queueName = getDestinationString() + ".SendMBBean";
@ -353,7 +354,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
for (int i = 0; i < count; i++) {
String body = "message:" + i;
Map headers = new HashMap();
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("JMSCorrelationID", "MyCorrId");
headers.put("JMSDeliveryMode", Boolean.FALSE);
headers.put("JMSXGroupID", "MyGroupID");
@ -370,7 +371,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
if (compdatalist.length == 0) {
fail("There is no message in the queue:");
}
String[] messageIDs = new String[compdatalist.length];
for (int i = 0; i < compdatalist.length; i++) {
CompositeData cdata = compdatalist[i];
@ -407,7 +407,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertComplexData(i, cdata, "JMSXGroupSeq", 1234);
assertComplexData(i, cdata, "JMSXGroupID", "MyGroupID");
assertComplexData(i, cdata, "Text", "message:" + i);
}
}
@ -632,7 +631,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
}
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:61616";
bindAddress = "tcp://localhost:0";
useTopic = false;
super.setUp();
mbeanServer = broker.getManagementContext().getMBeanServer();
@ -656,6 +655,11 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
super.tearDown();
}
@Override
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(false);
@ -691,7 +695,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
Thread.sleep(1000);
}
protected void useConnectionWithBlobMessage(Connection connection) throws Exception {
connection.setClientID(clientID);
connection.start();
@ -733,7 +736,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
LOG.info(text);
}
protected String getSecondDestinationString() {
return "test.new.destination." + getClass() + "." + getName();
}
@ -815,7 +817,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
} catch (Exception e) {
// expected!
}
}
// Test for AMQ-3029

View File

@ -25,6 +25,7 @@ import javax.management.ObjectName;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
@ -36,8 +37,6 @@ import org.slf4j.LoggerFactory;
/**
* A specific test of Queue.purge() functionality
*
*
*/
public class PurgeTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(PurgeTest.class);
@ -118,7 +117,6 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
Message message = session.createTextMessage("Test Message");
producer.send(message);
MessageConsumer consumer = session.createConsumer(destination);
Message received = consumer.receive(1000);
@ -128,16 +126,12 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
BrokerViewMBean brokerProxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true);
brokerProxy.removeQueue(getDestinationString());
producer.send(message);
received = consumer.receive(1000);
assertNotNull("Message not received", received);
assertEquals(message, received);
}
public void testDelete() throws Exception {
@ -209,7 +203,7 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
}
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:61616";
bindAddress = "tcp://localhost:0";
useTopic = false;
super.setUp();
mbeanServer = broker.getManagementContext().getMBeanServer();
@ -233,6 +227,11 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
return answer;
}
@Override
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
}
protected void echo(String text) {
LOG.info(text);
}

View File

@ -29,8 +29,8 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ConsumerBean;
/**
*
*
*
*
*/
public class AMQ1687Test extends EmbeddedBrokerTestSupport {
@ -40,10 +40,8 @@ public class AMQ1687Test extends EmbeddedBrokerTestSupport {
protected ConnectionFactory createConnectionFactory() throws Exception {
//prefetch change is not required, but test will not fail w/o it, only spew errors in the AMQ log.
return new ActiveMQConnectionFactory(this.bindAddress+"?jms.prefetchPolicy.all=5");
//return super.createConnectionFactory();
//return new ActiveMQConnectionFactory("tcp://localhost:61616");
}
public void testVirtualTopicCreation() throws Exception {
if (connection == null) {
connection = createConnection();
@ -52,10 +50,10 @@ public class AMQ1687Test extends EmbeddedBrokerTestSupport {
ConsumerBean messageList = new ConsumerBean();
messageList.setVerbose(true);
String queueAName = getVirtualTopicConsumerName();
String queueBName = getVirtualTopicConsumerNameB();
// create consumer 'cluster'
ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
ActiveMQQueue queue2 = new ActiveMQQueue(queueBName);
@ -76,16 +74,14 @@ public class AMQ1687Test extends EmbeddedBrokerTestSupport {
for (int i = 0; i < total; i++) {
producer.send(session.createTextMessage("message: " + i));
}
messageList.assertMessagesArrived(total*2);
}
protected String getVirtualTopicName() {
return "VirtualTopic.TEST";
}
protected String getVirtualTopicConsumerName() {
return "Consumer.A.VirtualTopic.TEST";
}
@ -93,8 +89,7 @@ public class AMQ1687Test extends EmbeddedBrokerTestSupport {
protected String getVirtualTopicConsumerNameB() {
return "Consumer.B.VirtualTopic.TEST";
}
protected void setUp() throws Exception {
this.bindAddress="tcp://localhost:61616";
super.setUp();

View File

@ -40,20 +40,20 @@ import org.slf4j.LoggerFactory;
/**
* This is a test case for the issue reported at:
* https://issues.apache.org/activemq/browse/AMQ-1866
*
* If you have a JMS producer sending messages to multiple fast consumers and
* one slow consumer, eventually all consumers will run as slow as
* the slowest consumer.
*
* If you have a JMS producer sending messages to multiple fast consumers and
* one slow consumer, eventually all consumers will run as slow as
* the slowest consumer.
*/
public class AMQ1866 extends TestCase {
private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class);
private BrokerService brokerService;
private ArrayList<Thread> threads = new ArrayList<Thread>();
String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";
String ACTIVEMQ_BROKER_URI = "tcp://localhost:61616";
private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
private String ACTIVEMQ_BROKER_URI;
AtomicBoolean shutdown = new AtomicBoolean();
private ActiveMQQueue destination;
@ -65,19 +65,21 @@ public class AMQ1866 extends TestCase {
adaptor.setIndexBinSize(4096);
brokerService.setPersistenceAdapter(adaptor);
brokerService.deleteAllMessages();
// A small max page size makes this issue occur faster.
PolicyMap policyMap = new PolicyMap();
PolicyEntry pe = new PolicyEntry();
pe.setMaxPageSize(1);
policyMap.put(new ActiveMQQueue(">"), pe);
brokerService.setDestinationPolicy(policyMap);
brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
brokerService.start();
ACTIVEMQ_BROKER_URI = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
destination = new ActiveMQQueue(getName());
}
@Override
protected void tearDown() throws Exception {
// Stop any running threads.
@ -85,30 +87,29 @@ public class AMQ1866 extends TestCase {
for (Thread t : threads) {
t.interrupt();
t.join();
}
}
brokerService.stop();
}
public void testConsumerSlowDownPrefetch0() throws Exception {
ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0";
ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=0";
doTestConsumerSlowDown();
}
public void testConsumerSlowDownPrefetch10() throws Exception {
ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=10";
ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=10";
doTestConsumerSlowDown();
}
public void testConsumerSlowDownDefaultPrefetch() throws Exception {
ACTIVEMQ_BROKER_URI = "tcp://localhost:61616";
doTestConsumerSlowDown();
}
public void doTestConsumerSlowDown() throws Exception {
// Preload the queue.
produce(20000);
Thread producer = new Thread() {
@Override
public void run() {
@ -122,7 +123,7 @@ public class AMQ1866 extends TestCase {
};
threads.add(producer);
producer.start();
// This is the slow consumer.
ConsumerThread c1 = new ConsumerThread("Consumer-1");
threads.add(c1);
@ -139,33 +140,33 @@ public class AMQ1866 extends TestCase {
Thread.sleep(1000);
long c1Counter = c1.counter.getAndSet(0);
long c2Counter = c2.counter.getAndSet(0);
System.out.println("c1: "+c1Counter+", c2: "+c2Counter);
log.debug("c1: "+c1Counter+", c2: "+c2Counter);
totalReceived += c1Counter;
totalReceived += c2Counter;
// Once message have been flowing for a few seconds, start asserting that c2 always gets messages. It should be receiving about 100 / sec
if( i > 10 ) {
assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0);
}
}
}
}
public void produce(int count) throws Exception {
Connection connection=null;
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
factory.setDispatchAsync(true);
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
connection.start();
for( int i=0 ; i< count; i++ ) {
producer.send(session.createTextMessage(getName()+" Message "+(++i)));
}
} finally {
try {
connection.close();
@ -173,7 +174,7 @@ public class AMQ1866 extends TestCase {
}
}
}
public class ConsumerThread extends Thread {
final AtomicLong counter = new AtomicLong();
@ -185,16 +186,16 @@ public class AMQ1866 extends TestCase {
Connection connection=null;
try {
log.debug(getName() + ": is running");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
factory.setDispatchAsync(true);
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
while (!shutdown.get()) {
TextMessage msg = (TextMessage)consumer.receive(1000);
if ( msg!=null ) {
@ -202,13 +203,13 @@ public class AMQ1866 extends TestCase {
if (getName().equals("Consumer-1")) {
sleepingTime = 1000 * 1000;
} else {
sleepingTime = 1;
sleepingTime = 1;
}
counter.incrementAndGet();
Thread.sleep(sleepingTime);
}
}
} catch (Exception e) {
} finally {
log.debug(getName() + ": is stopping");

View File

@ -43,21 +43,22 @@ public class AMQ1917Test extends TestCase {
private static final int NUM_MESSAGES = 4000;
private static final int NUM_THREADS = 10;
public static final String REQUEST_QUEUE = "mock.in.queue";
public static final String REPLY_QUEUE = "mock.out.queue";
private static final String REQUEST_QUEUE = "mock.in.queue";
private static final String REPLY_QUEUE = "mock.out.queue";
Destination requestDestination = ActiveMQDestination.createDestination(
private Destination requestDestination = ActiveMQDestination.createDestination(
REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE);
Destination replyDestination = ActiveMQDestination.createDestination(
private Destination replyDestination = ActiveMQDestination.createDestination(
REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE);
CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
CountDownLatch errorLatch = new CountDownLatch(1);
ThreadPoolExecutor tpe;
final String BROKER_URL = "tcp://localhost:61616";
BrokerService broker = null;
private CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
private CountDownLatch errorLatch = new CountDownLatch(1);
private ThreadPoolExecutor tpe;
private final String BROKER_URL = "tcp://localhost:61616";
private String connectionUri;
private BrokerService broker = null;
private boolean working = true;
// trival session/producer pool
final Session[] sessions = new Session[NUM_THREADS];
final MessageProducer[] producers = new MessageProducer[NUM_THREADS];
@ -67,11 +68,13 @@ public class AMQ1917Test extends TestCase {
broker.setPersistent(false);
broker.addConnector(BROKER_URL);
broker.start();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000,
TimeUnit.MILLISECONDS, queue);
ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory());
ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory());
tpe.setThreadFactory(limitedthreadFactory);
}
@ -79,29 +82,29 @@ public class AMQ1917Test extends TestCase {
broker.stop();
tpe.shutdown();
}
public void testLoadedSendRecieveWithCorrelationId() throws Exception {
public void testLoadedSendRecieveWithCorrelationId() throws Exception {
ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connectionFactory.setBrokerURL(connectionUri);
Connection connection = connectionFactory.createConnection();
setupReceiver(connection);
connection = connectionFactory.createConnection();
connection.start();
// trival session/producer pool
// trival session/producer pool
for (int i=0; i<NUM_THREADS; i++) {
sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producers[i] = sessions[i].createProducer(requestDestination);
}
for (int i = 0; i < NUM_MESSAGES; i++) {
MessageSenderReceiver msr = new MessageSenderReceiver(requestDestination,
replyDestination, "Test Message : " + i);
tpe.execute(msr);
}
while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) {
if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) {
fail("there was an error, check the console for thread or thread allocation failure");
@ -129,7 +132,7 @@ public class AMQ1917Test extends TestCase {
TextMessage msg = (TextMessage) consumer.receive(20000);
if (msg == null) {
errorLatch.countDown();
fail("Response timed out."
fail("Response timed out."
+ " latchCount=" + roundTripLatch.getCount());
} else {
String result = msg.getText();
@ -205,7 +208,7 @@ public class AMQ1917Test extends TestCase {
}
}
}
public class LimitedThreadFactory implements ThreadFactory {
int threadCount;
private ThreadFactory factory;
@ -217,7 +220,7 @@ public class AMQ1917Test extends TestCase {
if (++threadCount > NUM_THREADS) {
errorLatch.countDown();
fail("too many threads requested");
}
}
return factory.newThread(arg0);
}
}

View File

@ -38,43 +38,42 @@ import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ2314Test extends CombinationTestSupport {
public boolean consumeAll = false;
public int deliveryMode = DeliveryMode.NON_PERSISTENT;
private static final Logger LOG = LoggerFactory.getLogger(AMQ2314Test.class);
private static final int MESSAGES_COUNT = 30000;
private static byte[] buf = new byte[1024];
private BrokerService broker;
protected long messageReceiveTimeout = 500L;
private String connectionUri;
private static final long messageReceiveTimeout = 500L;
Destination destination = new ActiveMQTopic("FooTwo");
public void testRemoveSlowSubscriberWhacksTempStore() throws Exception {
runProducerWithHungConsumer();
}
public void testMemoryUsageReleasedOnAllConsumed() throws Exception {
consumeAll = true;
runProducerWithHungConsumer();
// do it again to ensure memory limits are decreased
runProducerWithHungConsumer();
}
public void runProducerWithHungConsumer() throws Exception {
final CountDownLatch consumerContinue = new CountDownLatch(1);
final CountDownLatch consumerReady = new CountDownLatch(1);
final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
factory.setAlwaysSyncSend(true);
// ensure messages are spooled to disk for this consumer
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setTopicPrefetch(500);
@ -99,19 +98,19 @@ public class AMQ2314Test extends CombinationTestSupport {
}
}
};
Thread consumingThread = new Thread("Consuming thread") {
public void run() {
try {
int count = 0;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
while (consumer.receive(messageReceiveTimeout) == null) {
consumerReady.countDown();
}
count++;
LOG.info("Received one... waiting");
LOG.info("Received one... waiting");
consumerContinue.await();
if (consumeAll) {
LOG.info("Consuming the rest of the messages...");
@ -128,27 +127,27 @@ public class AMQ2314Test extends CombinationTestSupport {
};
consumingThread.start();
consumerReady.await();
producingThread.start();
producingThread.join();
final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage();
LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription);
assertTrue("some temp store has been used", tempUsageBySubscription != origTempUsage);
consumerContinue.countDown();
consumingThread.join();
connection.close();
LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+ broker.getSystemUsage().getTempUsage().getUsage());
assertTrue("temp usage decreased with removed sub", Wait.waitFor(new Wait.Condition(){
public boolean isSatisified() throws Exception {
return broker.getSystemUsage().getTempUsage().getUsage() < tempUsageBySubscription;
}
}));
}
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
@ -159,17 +158,17 @@ public class AMQ2314Test extends CombinationTestSupport {
broker.setAdvisorySupport(false);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://localhost:61616").setName("Default");
broker.addConnector("tcp://localhost:0").setName("Default");
broker.start();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public void tearDown() throws Exception {
broker.stop();
}
public static Test suite() {
return suite(AMQ2314Test.class);
}
}

View File

@ -31,53 +31,55 @@ import org.apache.activemq.broker.jmx.ManagementContext;
/**
* This unit test verifies an issue when
* javax.management.InstanceNotFoundException is thrown after subsequent startups when
* javax.management.InstanceNotFoundException is thrown after subsequent startups when
* managementContext createConnector="false"
*
*/
public class AMQ2513Test extends TestCase {
BrokerService broker;
void createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
private BrokerService broker;
private String connectionUri;
void createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = new BrokerService();
broker.setBrokerName("localhost");
broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
broker.addConnector("tcp://localhost:61616");
broker.addConnector("tcp://localhost:0");
ManagementContext ctx = new ManagementContext();
//if createConnector == true everything is fine
ctx.setCreateConnector(false);
broker.setManagementContext(ctx);
broker.start();
broker.start();
broker.waitUntilStarted();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public void testJmx() throws Exception{
createBroker(true);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
public void testJmx() throws Exception{
createBroker(true);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue("test"));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
producer.send(session.createTextMessage("test123"));
DestinationViewMBean dv = createView();
assertTrue(dv.getQueueSize() > 0);
connection.close();
broker.stop();
broker.waitUntilStopped();
createBroker(false);
factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
factory = new ActiveMQConnectionFactory(connectionUri);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createQueue("test"));
@ -85,20 +87,20 @@ public class AMQ2513Test extends TestCase {
connection.start();
producer.send(session.createTextMessage("test123"));
connection.close();
dv = createView();
assertTrue(dv.getQueueSize() > 0);
broker.stop();
broker.waitUntilStopped();
}
DestinationViewMBean createView() throws Exception {
}
DestinationViewMBean createView() throws Exception {
String domain = "org.apache.activemq";
ObjectName name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
true);
}
}

View File

@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@ -39,17 +38,18 @@ public class AMQ2616Test extends TestCase {
private static final int NUMBER = 2000;
private BrokerService brokerService;
private final ArrayList<Thread> threads = new ArrayList<Thread>();
String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:61616";
AtomicBoolean shutdown = new AtomicBoolean();
private final String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:0";
private final AtomicBoolean shutdown = new AtomicBoolean();
private String connectionUri;
public void testQueueResourcesReleased() throws Exception{
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND);
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(connectionUri);
Connection tempConnection = fac.createConnection();
tempConnection.start();
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue tempQueue = tempSession.createTemporaryQueue();
final MessageConsumer tempConsumer = tempSession.createConsumer(tempQueue);
Connection testConnection = fac.createConnection();
long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -67,8 +67,8 @@ public class AMQ2616Test extends TestCase {
endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
assertEquals(startUsage,endUsage);
}
@Override
protected void setUp() throws Exception {
// Start an embedded broker up.
@ -95,6 +95,10 @@ public class AMQ2616Test extends TestCase {
brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024);
brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
brokerService.start();
brokerService.waitUntilStarted();
connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
new ActiveMQQueue(getName());
}

View File

@ -18,7 +18,6 @@ package org.apache.activemq.bugs;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
@ -26,13 +25,12 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
/**
*
*/
public class CraigsBugTest extends EmbeddedBrokerTestSupport {
private String connectionUri;
public void testConnectionFactory() throws Exception {
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
final ActiveMQQueue queue = new ActiveMQQueue("testqueue");
final Connection conn = cf.createConnection();
@ -60,8 +58,10 @@ public class CraigsBugTest extends EmbeddedBrokerTestSupport {
}
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:61616";
bindAddress = "tcp://localhost:0";
super.setUp();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
}

View File

@ -41,14 +41,15 @@ public class DataFileNotDeletedTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(DataFileNotDeletedTest.class);
private final CountDownLatch latch = new CountDownLatch(max_messages);
private static int max_messages = 600;
private final static int max_messages = 600;
private static int messageCounter;
private final String destinationName = getName()+"_Queue";
private BrokerService broker;
private Connection receiverConnection;
private Connection producerConnection;
final boolean useTopic = false;
private final boolean useTopic = false;
private String connectionUri;
AMQPersistenceAdapter persistentAdapter;
protected static final String payload = new String(new byte[512]);
@ -61,7 +62,7 @@ public class DataFileNotDeletedTest extends TestCase {
producerConnection = createConnection();
producerConnection.start();
}
@Override
public void tearDown() throws Exception {
receiverConnection.close();
@ -72,16 +73,16 @@ public class DataFileNotDeletedTest extends TestCase {
public void testForDataFileNotDeleted() throws Exception {
doTestForDataFileNotDeleted(false);
}
public void testForDataFileNotDeletedTransacted() throws Exception {
doTestForDataFileNotDeleted(true);
}
private void doTestForDataFileNotDeleted(boolean transacted) throws Exception {
Receiver receiver = new Receiver() {
public void receive(String s) throws Exception {
messageCounter++;
messageCounter++;
latch.countDown();
}
};
@ -94,7 +95,7 @@ public class DataFileNotDeletedTest extends TestCase {
latch.await();
assertEquals(max_messages, messageCounter);
LOG.info("Sent and received + " + messageCounter + ", file count " + persistentAdapter.getAsyncDataManager().getFiles().size());
waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 60000, 2);
waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 60000, 2);
}
private void waitFordataFilesToBeCleanedUp(
@ -111,7 +112,7 @@ public class DataFileNotDeletedTest extends TestCase {
}
private Connection createConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
return factory.createConnection();
}
@ -120,7 +121,7 @@ public class DataFileNotDeletedTest extends TestCase {
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616").setName("Default");
broker.addConnector("tcp://localhost:0").setName("Default");
broker.setPersistenceFactory(new AMQPersistenceAdapterFactory());
AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
// ensure there are a bunch of data files but multiple entries in each
@ -129,10 +130,13 @@ public class DataFileNotDeletedTest extends TestCase {
factory.setCheckpointInterval(500);
factory.setCleanupInterval(500);
factory.setSyncOnWrite(false);
persistentAdapter = (AMQPersistenceAdapter) broker.getPersistenceAdapter();
broker.start();
LOG.info("Starting broker..");
broker.waitUntilStarted();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
private void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception {

View File

@ -16,10 +16,9 @@
*/
package org.apache.activemq.bugs;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
@ -28,128 +27,132 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.RequestTimedOutIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JmsTimeoutTest extends EmbeddedBrokerTestSupport {
static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class);
private int messageSize=1024*64;
private int messageCount=10000;
private final AtomicInteger exceptionCount = new AtomicInteger(0);
/**
* Test the case where the broker is blocked due to a memory limit
* and a producer timeout is set on the connection.
* @throws Exception
*/
public void testBlockedProducerConnectionTimeout() throws Exception {
final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
final ActiveMQDestination queue = createDestination("testqueue");
// we should not take longer than 10 seconds to return from send
cx.setSendTimeout(10000);
Runnable r = new Runnable() {
public void run() {
try {
LOG.info("Sender thread starting");
Session session = cx.createSession(false, 1);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage(createMessageText());
for(int count=0; count<messageCount; count++){
producer.send(message);
}
LOG.info("Done sending..");
} catch (JMSException e) {
if (e.getCause() instanceof RequestTimedOutIOException) {
exceptionCount.incrementAndGet();
} else {
e.printStackTrace();
}
return;
}
static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class);
}
};
cx.start();
Thread producerThread = new Thread(r);
producerThread.start();
producerThread.join(30000);
cx.close();
// We should have a few timeout exceptions as memory store will fill up
assertTrue("No exception from the broker", exceptionCount.get() > 0);
}
private final int messageSize=1024*64;
private final int messageCount=10000;
private final AtomicInteger exceptionCount = new AtomicInteger(0);
/**
* Test the case where the broker is blocked due to a memory limit
* and a producer timeout is set on the connection.
* @throws Exception
*/
public void testBlockedProducerConnectionTimeout() throws Exception {
final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
final ActiveMQDestination queue = createDestination("testqueue");
/**
* Test the case where the broker is blocked due to a memory limit
* with a fail timeout
* @throws Exception
*/
public void testBlockedProducerUsageSendFailTimeout() throws Exception {
final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
final ActiveMQDestination queue = createDestination("testqueue");
// we should not take longer than 10 seconds to return from send
cx.setSendTimeout(10000);
broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
Runnable r = new Runnable() {
public void run() {
try {
LOG.info("Sender thread starting");
Session session = cx.createSession(false, 1);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Runnable r = new Runnable() {
public void run() {
try {
LOG.info("Sender thread starting");
Session session = cx.createSession(false, 1);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage(createMessageText());
for(int count=0; count<messageCount; count++){
producer.send(message);
}
LOG.info("Done sending..");
} catch (JMSException e) {
if (e instanceof ResourceAllocationException || e.getCause() instanceof RequestTimedOutIOException) {
exceptionCount.incrementAndGet();
} else {
e.printStackTrace();
}
return;
}
TextMessage message = session.createTextMessage(createMessageText());
for(int count=0; count<messageCount; count++){
producer.send(message);
}
LOG.info("Done sending..");
} catch (JMSException e) {
if (e.getCause() instanceof RequestTimedOutIOException) {
exceptionCount.incrementAndGet();
} else {
e.printStackTrace();
}
return;
}
}
};
cx.start();
Thread producerThread = new Thread(r);
producerThread.start();
producerThread.join(30000);
cx.close();
// We should have a few timeout exceptions as memory store will fill up
assertTrue("No exception from the broker", exceptionCount.get() > 0);
}
}
};
cx.start();
Thread producerThread = new Thread(r);
producerThread.start();
producerThread.join(30000);
cx.close();
// We should have a few timeout exceptions as memory store will fill up
assertTrue("No exception from the broker", exceptionCount.get() > 0);
}
protected void setUp() throws Exception {
exceptionCount.set(0);
bindAddress = "tcp://localhost:61616";
broker = createBroker();
broker.setDeleteAllMessagesOnStartup(true);
broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
/**
* Test the case where the broker is blocked due to a memory limit
* with a fail timeout
* @throws Exception
*/
public void testBlockedProducerUsageSendFailTimeout() throws Exception {
final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
final ActiveMQDestination queue = createDestination("testqueue");
super.setUp();
}
broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
Runnable r = new Runnable() {
public void run() {
try {
LOG.info("Sender thread starting");
Session session = cx.createSession(false, 1);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
private String createMessageText() {
StringBuffer buffer = new StringBuffer();
buffer.append("<filler>");
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append('X');
}
buffer.append("</filler>");
return buffer.toString();
}
}
TextMessage message = session.createTextMessage(createMessageText());
for(int count=0; count<messageCount; count++){
producer.send(message);
}
LOG.info("Done sending..");
} catch (JMSException e) {
if (e instanceof ResourceAllocationException || e.getCause() instanceof RequestTimedOutIOException) {
exceptionCount.incrementAndGet();
} else {
e.printStackTrace();
}
return;
}
}
};
cx.start();
Thread producerThread = new Thread(r);
producerThread.start();
producerThread.join(30000);
cx.close();
// We should have a few timeout exceptions as memory store will fill up
assertTrue("No exception from the broker", exceptionCount.get() > 0);
}
protected void setUp() throws Exception {
exceptionCount.set(0);
bindAddress = "tcp://localhost:0";
broker = createBroker();
broker.setDeleteAllMessagesOnStartup(true);
broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
super.setUp();
}
@Override
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getPublishableConnectString());
}
private String createMessageText() {
StringBuffer buffer = new StringBuffer();
buffer.append("<filler>");
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append('X');
}
buffer.append("</filler>");
return buffer.toString();
}
}

View File

@ -38,101 +38,101 @@ import javax.management.ObjectName;
/**
* Test to determine if expired messages are being reaped if there is
* no active consumer connected to the broker.
*
* @author bsnyder
*
* no active consumer connected to the broker.
*/
public class MessageExpirationReaperTest {
protected BrokerService broker;
protected ConnectionFactory factory;
protected ActiveMQConnection connection;
protected String destinationName = "TEST.Q";
protected String brokerUrl = "tcp://localhost:61616";
protected String brokerName = "testBroker";
private BrokerService broker;
private ConnectionFactory factory;
private ActiveMQConnection connection;
private final String destinationName = "TEST.Q";
private final String brokerUrl = "tcp://localhost:0";
private final String brokerName = "testBroker";
private String connectionUri;
@Before
public void init() throws Exception {
createBroker();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
factory = createConnectionFactory();
connection = (ActiveMQConnection) factory.createConnection();
connection.start();
}
@After
public void cleanUp() throws Exception {
connection.close();
broker.stop();
}
protected void createBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName(brokerName);
broker.addConnector(brokerUrl);
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(500);
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
broker.start();
}
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(brokerUrl);
return new ActiveMQConnectionFactory(connectionUri);
}
protected Session createSession() throws Exception {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@Test
public void testExpiredMessageReaping() throws Exception {
Session producerSession = createSession();
ActiveMQDestination destination = (ActiveMQDestination) producerSession.createQueue(destinationName);
MessageProducer producer = producerSession.createProducer(destination);
producer.setTimeToLive(1000);
final int count = 3;
// Send some messages with an expiration
// Send some messages with an expiration
for (int i = 0; i < count; i++) {
TextMessage message = producerSession.createTextMessage("" + i);
producer.send(message);
}
// Let the messages expire
// Let the messages expire
Thread.sleep(2000);
DestinationViewMBean view = createView(destination);
assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount());
assertEquals("Incorrect queue size count", 0, view.getQueueSize());
assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount());
// Send more messages with an expiration
assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount());
// Send more messages with an expiration
for (int i = 0; i < count; i++) {
TextMessage message = producerSession.createTextMessage("" + i);
producer.send(message);
}
// Let the messages expire
// Let the messages expire
Thread.sleep(2000);
// Simply browse the queue
// Simply browse the queue
Session browserSession = createSession();
QueueBrowser browser = browserSession.createBrowser((Queue) destination);
assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements());
// The messages expire and should be reaped because of the presence of
// the queue browser
assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements());
// The messages expire and should be reaped because of the presence of
// the queue browser
assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
String domain = "org.apache.activemq";
ObjectName name;

View File

@ -35,44 +35,43 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OutOfOrderTestCase extends TestCase {
private static final Logger log = LoggerFactory.getLogger(OutOfOrderTestCase.class);
public static final String BROKER_URL = "tcp://localhost:61616";
private static final int PREFETCH = 10;
private static final String CONNECTION_URL = BROKER_URL + "?jms.prefetchPolicy.all=" + PREFETCH;
public static final String QUEUE_NAME = "QUEUE";
private static final String DESTINATION = "QUEUE?consumer.exclusive=true";
BrokerService brokerService;
Session session;
Connection connection;
int seq = 0;
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.addConnector(BROKER_URL);
brokerService.deleteAllMessages();
brokerService.start();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
}
private static final Logger log = LoggerFactory.getLogger(OutOfOrderTestCase.class);
protected void tearDown() throws Exception {
session.close();
connection.close();
brokerService.stop();
}
private static final String BROKER_URL = "tcp://localhost:0";
private static final int PREFETCH = 10;
private static final String CONNECTION_URL_OPTIONS = "?jms.prefetchPolicy.all=" + PREFETCH;
private static final String DESTINATION = "QUEUE?consumer.exclusive=true";
private BrokerService brokerService;
private Session session;
private Connection connection;
private String connectionUri;
private int seq = 0;
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.addConnector(BROKER_URL);
brokerService.deleteAllMessages();
brokerService.start();
brokerService.waitUntilStarted();
connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + CONNECTION_URL_OPTIONS);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
}
protected void tearDown() throws Exception {
session.close();
connection.close();
brokerService.stop();
}
public void testOrder() throws Exception {
@ -102,7 +101,7 @@ public class OutOfOrderTestCase extends TestCase {
log.info("Consuming messages 20-29 . . .");
consumeBatch();
}
protected void consumeBatch() throws Exception {
Destination destination = session.createQueue(DESTINATION);
final MessageConsumer messageConsumer = session.createConsumer(destination);
@ -118,15 +117,15 @@ public class OutOfOrderTestCase extends TestCase {
}
}
private String toString(final Message message) throws JMSException {
String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID();
if (message.getJMSRedelivered())
ret += " (redelivered)";
return ret;
}
private String toString(final Message message) throws JMSException {
String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID();
if (message.getJMSRedelivered())
ret += " (redelivered)";
return ret;
private static String createMessageText(final int index) {
return "message #" + index;
}
}
private static String createMessageText(final int index) {
return "message #" + index;
}
}

View File

@ -41,21 +41,28 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test case demonstrating situation where messages are not delivered to consumers.
* Test case demonstrating situation where messages are not delivered to
* consumers.
*/
public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
{
private static final Logger LOG = LoggerFactory.getLogger(QueueWorkerPrefetchTest.class);
public class QueueWorkerPrefetchTest extends TestCase implements
MessageListener {
private static final Logger LOG = LoggerFactory
.getLogger(QueueWorkerPrefetchTest.class);
private static final int BATCH_SIZE = 10;
private static final long WAIT_TIMEOUT = 1000*10;
private static final long WAIT_TIMEOUT = 1000 * 10;
/** The connection URL. */
private static final String CONNECTION_URL = "tcp://localhost:61616";
private static final String BROKER_BIND_ADDRESS = "tcp://localhost:0";
/** The queue prefetch size to use. A value greater than 1 seems to make things work. */
/**
* The queue prefetch size to use. A value greater than 1 seems to make
* things work.
*/
private static final int QUEUE_PREFETCH_SIZE = 1;
/** The number of workers to use. A single worker with a prefetch of 1 works. */
/**
* The number of workers to use. A single worker with a prefetch of 1 works.
*/
private static final int NUM_WORKERS = 2;
/** Embedded JMS broker. */
@ -68,32 +75,37 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
private MessageConsumer masterItemConsumer;
/** The number of acks received by the master. */
private AtomicLong acksReceived = new AtomicLong(0);
private final AtomicLong acksReceived = new AtomicLong(0);
private AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>();
private final AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>();
private String connectionUri;
/** Messages sent to the work-item queue. */
private static class WorkMessage implements Serializable
{
private static class WorkMessage implements Serializable {
private static final long serialVersionUID = 1L;
private final int id;
public WorkMessage(int id) {
this.id = id;
}
@Override
public String toString() {
return "Work: "+id;
return "Work: " + id;
}
}
/**
* The worker process. Consume messages from the work-item queue, possibly creating
* more messages to submit to the work-item queue. For each work item, send an ack
* to the master.
* The worker process. Consume messages from the work-item queue, possibly
* creating more messages to submit to the work-item queue. For each work
* item, send an ack to the master.
*/
private static class Worker implements MessageListener
{
/** Counter shared between workers to decided when new work-item messages are created. */
private static class Worker implements MessageListener {
/**
* Counter shared between workers to decided when new work-item messages
* are created.
*/
private static AtomicInteger counter = new AtomicInteger(0);
/** Session to use. */
@ -105,54 +117,48 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
/** Producer for sending new work items to the work-items queue. */
private MessageProducer workItemProducer;
public Worker(Session session)
throws JMSException
{
public Worker(Session session) throws JMSException {
this.session = session;
masterItemProducer = session.createProducer(session.createQueue("master-item"));
masterItemProducer = session.createProducer(session
.createQueue("master-item"));
Queue workItemQueue = session.createQueue("work-item");
workItemProducer = session.createProducer(workItemQueue);
MessageConsumer workItemConsumer = session.createConsumer(workItemQueue);
MessageConsumer workItemConsumer = session
.createConsumer(workItemQueue);
workItemConsumer.setMessageListener(this);
}
public void onMessage(javax.jms.Message message)
{
try
{
WorkMessage work = (WorkMessage)((ObjectMessage)message).getObject();
public void onMessage(javax.jms.Message message) {
try {
WorkMessage work = (WorkMessage) ((ObjectMessage) message)
.getObject();
long c = counter.incrementAndGet();
// Don't create a new work item for every BATCH_SIZE message. */
if (c % BATCH_SIZE != 0)
{
if (c % BATCH_SIZE != 0) {
// Send new work item to work-item queue.
workItemProducer.send(session.createObjectMessage(
new WorkMessage(work.id+1)));
workItemProducer.send(session
.createObjectMessage(new WorkMessage(work.id + 1)));
}
// Send ack to master.
masterItemProducer.send(session.createObjectMessage(work));
}
catch (JMSException e)
{
} catch (JMSException e) {
throw new IllegalStateException("Something has gone wrong", e);
}
}
/** Close of JMS resources used by worker. */
public void close() throws JMSException
{
public void close() throws JMSException {
masterItemProducer.close();
workItemProducer.close();
session.close();
}
}
/** Master message handler. Process ack messages. */
public void onMessage(javax.jms.Message message)
{
/** Master message handler. Process ack messages. */
public void onMessage(javax.jms.Message message) {
long acks = acksReceived.incrementAndGet();
latch.get().countDown();
if (acks % 1 == 0) {
@ -160,68 +166,72 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
}
}
protected void setUp() throws Exception
{
protected void setUp() throws Exception {
// Create the message broker.
super.setUp();
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(true);
broker.addConnector(CONNECTION_URL);
broker.addConnector(BROKER_BIND_ADDRESS);
broker.start();
broker.waitUntilStarted();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
protected void tearDown() throws Exception
{
protected void tearDown() throws Exception {
// Shut down the message broker.
broker.deleteAllMessages();
broker.stop();
super.tearDown();
}
public void testActiveMQ()
throws Exception
{
public void testActiveMQ() throws Exception {
// Create the connection to the broker.
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(CONNECTION_URL);
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(QUEUE_PREFETCH_SIZE);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
Connection connection = connectionFactory.createConnection();
connection.start();
Session masterSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
workItemProducer = masterSession.createProducer(masterSession.createQueue("work-item"));
masterItemConsumer = masterSession.createConsumer(masterSession.createQueue("master-item"));
Session masterSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
workItemProducer = masterSession.createProducer(masterSession
.createQueue("work-item"));
masterItemConsumer = masterSession.createConsumer(masterSession
.createQueue("master-item"));
masterItemConsumer.setMessageListener(this);
// Create the workers.
Worker[] workers = new Worker[NUM_WORKERS];
for (int i = 0; i < NUM_WORKERS; i++)
{
workers[i] = new Worker(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
for (int i = 0; i < NUM_WORKERS; i++) {
workers[i] = new Worker(connection.createSession(false,
Session.AUTO_ACKNOWLEDGE));
}
// Send a message to the work queue, and wait for the BATCH_SIZE acks from the workers.
// Send a message to the work queue, and wait for the BATCH_SIZE acks
// from the workers.
acksReceived.set(0);
latch.set(new CountDownLatch(BATCH_SIZE));
workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1)));
workItemProducer.send(masterSession
.createObjectMessage(new WorkMessage(1)));
if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
fail("First batch only received " + acksReceived + " messages");
}
LOG.info("First batch received");
LOG.info("First batch received");
// Send another message to the work queue, and wait for the next 1000 acks. It is
// Send another message to the work queue, and wait for the next 1000 acks. It is
// at this point where the workers never get notified of this message, as they
// have a large pending queue. Creating a new worker at this point however will
// have a large pending queue. Creating a new worker at this point however will
// receive this new message.
acksReceived.set(0);
latch.set(new CountDownLatch(BATCH_SIZE));
workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1)));
workItemProducer.send(masterSession
.createObjectMessage(new WorkMessage(1)));
if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
fail("Second batch only received " + acksReceived + " messages");
}

View File

@ -41,8 +41,8 @@ public class SlowConsumerTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(SlowConsumerTest.class);
private static final int MESSAGES_COUNT = 10000;
protected int messageLogFrequency = 2500;
protected long messageReceiveTimeout = 10000L;
private final int messageLogFrequency = 2500;
private final long messageReceiveTimeout = 10000L;
private Socket stompSocket;
private ByteArrayOutputStream inputBuffer;
@ -58,9 +58,10 @@ public class SlowConsumerTest extends TestCase {
broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://localhost:61616").setName("Default");
broker.addConnector("tcp://localhost:0").setName("Default");
broker.start();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getPublishableConnectString());
final Connection connection = factory.createConnection();
connection.start();

View File

@ -43,8 +43,8 @@ import org.slf4j.LoggerFactory;
public class TransactionNotStartedErrorTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(TransactionNotStartedErrorTest.class);
private static int counter = 500;
private static final int counter = 500;
private static int hectorToHaloCtr;
private static int xenaToHaloCtr;
@ -54,14 +54,13 @@ public class TransactionNotStartedErrorTest extends TestCase {
private static int haloToXenaCtr;
private static int haloToTroyCtr;
private String hectorToHalo = "hectorToHalo";
private String xenaToHalo = "xenaToHalo";
private String troyToHalo = "troyToHalo";
private String haloToHector = "haloToHector";
private String haloToXena = "haloToXena";
private String haloToTroy = "haloToTroy";
private final String hectorToHalo = "hectorToHalo";
private final String xenaToHalo = "xenaToHalo";
private final String troyToHalo = "troyToHalo";
private final String haloToHector = "haloToHector";
private final String haloToXena = "haloToXena";
private final String haloToTroy = "haloToTroy";
private BrokerService broker;
@ -72,8 +71,9 @@ public class TransactionNotStartedErrorTest extends TestCase {
private final Object lock = new Object();
public Connection createConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
public Connection createConnection() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getPublishableConnectString());
return factory.createConnection();
}
@ -86,7 +86,7 @@ public class TransactionNotStartedErrorTest extends TestCase {
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616").setName("Default");
broker.addConnector("tcp://localhost:0").setName("Default");
broker.start();
LOG.info("Starting broker..");
}
@ -234,12 +234,10 @@ public class TransactionNotStartedErrorTest extends TestCase {
}
public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception {
return new MessageSender(queueName, connection, true, false);
}
public Thread buildProducer(Connection connection, final String queueName) throws Exception {
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageSender producer = new MessageSender(queueName, connection, false, false);
Thread thread = new Thread() {

View File

@ -326,7 +326,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
protected BrokerService createBroker() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setEnableStatistics(false);
brokerService.addConnector("tcp://0.0.0.0:61616");
brokerService.addConnector("tcp://0.0.0.0:0");
brokerService.setDeleteAllMessagesOnStartup(true);
PolicyEntry policy = new PolicyEntry();
@ -343,9 +343,9 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
policyMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(policyMap);
if (false) {
// external mysql works a lot faster
//
// if (false) {
// // external mysql works a lot faster
// //
// JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
// BasicDataSource ds = new BasicDataSource();
// com.mysql.jdbc.Driver d = new com.mysql.jdbc.Driver();
@ -358,28 +358,29 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
// jdbc.setDataSource(ds);
// brokerService.setPersistenceAdapter(jdbc);
/* add mysql bits to the pom in the testing dependencies
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.2.2</version>
<scope>test</scope>
</dependency>
/* add mysql bits to the pom in the testing dependencies
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.2.2</version>
<scope>test</scope>
</dependency>
*/
} else {
// } else {
setDefaultPersistenceAdapter(brokerService);
}
// }
return brokerService;
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getPublishableConnectString());
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(1);
factory.setPrefetchPolicy(prefetchPolicy);

View File

@ -1,4 +1,3 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@ -16,6 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
@ -32,10 +32,8 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Rajani Chennamaneni
*
*/
public class DispatchMultipleConsumersTest extends TestCase {
private final static Logger logger = LoggerFactory.getLogger(DispatchMultipleConsumersTest.class);
@ -50,10 +48,10 @@ public class DispatchMultipleConsumersTest extends TestCase {
AtomicInteger consumedCount;
CountDownLatch producerLatch;
CountDownLatch consumerLatch;
String brokerURL = "tcp://localhost:61616";
String brokerURL;
String userName = "";
String password = "";
@Override
protected void setUp() throws Exception {
super.setUp();
@ -61,42 +59,36 @@ public class DispatchMultipleConsumersTest extends TestCase {
broker.setPersistent(true);
broker.setUseJmx(true);
broker.deleteAllMessages();
broker.addConnector("tcp://localhost:61616");
broker.addConnector("tcp://localhost:0");
broker.start();
broker.waitUntilStarted();
dest = new ActiveMQQueue(destinationName);
resetCounters();
brokerURL = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
@Override
protected void tearDown() throws Exception {
// broker.stop();
broker.stop();
broker.waitUntilStopped();
super.tearDown();
}
private void resetCounters() {
sentCount = new AtomicInteger(0);
consumedCount = new AtomicInteger(0);
producerLatch = new CountDownLatch(producerThreads);
consumerLatch = new CountDownLatch(consumerCount);
}
public void testDispatch1() {
for (int i = 1; i <= 5; i++) {
resetCounters();
dispatch();
/*try {
System.out.print("Press Enter to continue/finish:");
//pause to check the counts on JConsole
System.in.read();
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}*/
//check for consumed messages count
assertEquals("Incorrect messages in Iteration " + i, sentCount.get(), consumedCount.get());
}
}
private void dispatch() {
startConsumers();
startProducers();
@ -130,15 +122,13 @@ public class DispatchMultipleConsumersTest extends TestCase {
}
private class ConsumerThread extends Thread {
Connection conn;
Session session;
MessageConsumer consumer;
public ConsumerThread(Connection conn, String name) {
super();
this.conn = conn;
this.setName(name);
logger.info("Created new consumer thread:" + name);
logger.trace("Created new consumer thread:" + name);
try {
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(dest);
@ -170,24 +160,25 @@ public class DispatchMultipleConsumersTest extends TestCase {
nullCount = 0;
}
Thread.sleep(100);
logger.info("Message received:" + msg.getJMSMessageID());
if (logger.isTraceEnabled()) {
logger.trace("Message received:" + msg.getJMSMessageID());
}
msgCount++;
} catch (JMSException e) {
logger.error("Failed to consume:", e);
logger.error("Failed to consume:", e);
} catch (InterruptedException e) {
logger.error("Interrupted!", e);
logger.error("Interrupted!", e);
}
}
try {
consumer.close();
} catch (JMSException e) {
logger.error("Failed to close consumer " + getName(), e);
logger.error("Failed to close consumer " + getName(), e);
}
consumedCount.addAndGet(msgCount);
consumerLatch.countDown();
logger.info("Consumed " + msgCount + " messages using thread " + getName());
logger.trace("Consumed " + msgCount + " messages using thread " + getName());
}
}
private class ProducerThread extends Thread {
@ -195,12 +186,12 @@ public class DispatchMultipleConsumersTest extends TestCase {
Connection conn;
Session session;
MessageProducer producer;
public ProducerThread(ActiveMQConnectionFactory connFactory, int count, String name) {
super();
this.count = count;
this.setName(name);
logger.info("Created new producer thread:" + name);
logger.trace("Created new producer thread:" + name);
try {
conn = connFactory.createConnection();
conn.start();
@ -224,12 +215,13 @@ public class DispatchMultipleConsumersTest extends TestCase {
} catch (JMSException e) {
logger.error(e.getMessage(), e);
} catch (InterruptedException e) {
logger.error("Interrupted!", e);
logger.error("Interrupted!", e);
}
sentCount.addAndGet(i);
producerLatch.countDown();
logger.info("Sent " + i + " messages from thread " + getName());
if (logger.isTraceEnabled()) {
logger.trace("Sent " + i + " messages from thread " + getName());
}
}
}
}

View File

@ -48,7 +48,6 @@ public class MessageGroupCloseTest extends TestCase {
private HashSet<String> closedGroups2 = new HashSet<String>();
// with the prefetch too high, this bug is not realized
private static final String connStr =
//"tcp://localhost:61616";
"vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1";
public void testNewConsumer() throws JMSException, InterruptedException {