mirror of https://github.com/apache/activemq.git
removed a load of System.out statements from unit tests; using commons-logging instead so the maven build is much cleaner
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@546537 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2d3106f56e
commit
aa1aa58caa
|
@ -28,14 +28,17 @@ import org.apache.activemq.memory.UsageManager;
|
|||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||
import org.apache.activemq.network.NetworkConnector;
|
||||
import org.apache.activemq.pool.PooledConnectionFactory;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.jms.core.JmsTemplate;
|
||||
import org.springframework.jms.core.MessageCreator;
|
||||
import org.springframework.jms.listener.DefaultMessageListenerContainer;
|
||||
|
||||
|
||||
public class AMQDeadlockTest3 extends TestCase {
|
||||
private static final transient Log log = LogFactory.getLog(AMQDeadlockTest3.class);
|
||||
|
||||
private static final String URL1 = "tcp://localhost:61616";
|
||||
private static final String URL1 = "tcp://localhost:61616";
|
||||
|
||||
private static final String URL2 = "tcp://localhost:61617";
|
||||
|
||||
|
@ -300,7 +303,7 @@ public class AMQDeadlockTest3 extends TestCase {
|
|||
public void onMessage(Message msg) {
|
||||
|
||||
try {
|
||||
System.out.println("Listener1 Consumed message "+ msg.getIntProperty("count"));
|
||||
log.info("Listener1 Consumed message "+ msg.getIntProperty("count"));
|
||||
|
||||
messageCount.incrementAndGet();
|
||||
doneLatch.countDown();
|
||||
|
@ -363,7 +366,7 @@ public class AMQDeadlockTest3 extends TestCase {
|
|||
}
|
||||
});
|
||||
|
||||
System.out.println("PooledProducer sent message: "+ count.get());
|
||||
log.info("PooledProducer sent message: "+ count.get());
|
||||
// Thread.sleep(1000);
|
||||
}
|
||||
|
||||
|
@ -422,7 +425,7 @@ public class AMQDeadlockTest3 extends TestCase {
|
|||
}
|
||||
});
|
||||
|
||||
System.out.println("Non-PooledProducer sent message: " + count.get());
|
||||
log.info("Non-PooledProducer sent message: " + count.get());
|
||||
|
||||
// Thread.sleep(1000);
|
||||
}
|
||||
|
|
|
@ -40,6 +40,8 @@ import org.apache.activemq.broker.BrokerService;
|
|||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -57,6 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
* @version $Revision$
|
||||
*/
|
||||
public class JmsBenchmark extends JmsTestSupport {
|
||||
private static final transient Log log = LogFactory.getLog(JmsBenchmark.class);
|
||||
|
||||
private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5));
|
||||
private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10"));
|
||||
|
@ -174,13 +177,13 @@ public class JmsBenchmark extends JmsTestSupport {
|
|||
}.start();
|
||||
}
|
||||
|
||||
System.out.println(getName() + ": Waiting for Producers and Consumers to startup.");
|
||||
log.info(getName() + ": Waiting for Producers and Consumers to startup.");
|
||||
connectionsEstablished.acquire();
|
||||
System.out.println("Producers and Consumers are now running. Waiting for system to reach steady state: "
|
||||
log.info("Producers and Consumers are now running. Waiting for system to reach steady state: "
|
||||
+ (SAMPLE_DELAY / 1000.0f) + " seconds");
|
||||
Thread.sleep(1000 * 10);
|
||||
|
||||
System.out.println("Starting sample: "+SAMPLES+" each lasting "+ (SAMPLE_DURATION / 1000.0f) + " seconds");
|
||||
log.info("Starting sample: "+SAMPLES+" each lasting "+ (SAMPLE_DURATION / 1000.0f) + " seconds");
|
||||
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
|
@ -196,11 +199,11 @@ public class JmsBenchmark extends JmsTestSupport {
|
|||
int r = receivedMessages.get();
|
||||
int p = producedMessages.get();
|
||||
|
||||
System.out.println("published: " + p + " msgs at "+ (p * 1000f / (end - start)) + " msgs/sec, "+
|
||||
log.info("published: " + p + " msgs at "+ (p * 1000f / (end - start)) + " msgs/sec, "+
|
||||
"consumed: " + r + " msgs at "+ (r * 1000f / (end - start)) + " msgs/sec");
|
||||
}
|
||||
|
||||
System.out.println("Sample done.");
|
||||
log.info("Sample done.");
|
||||
sampleTimeDone.countDown();
|
||||
|
||||
workerDone.acquire();
|
||||
|
|
|
@ -39,6 +39,8 @@ import org.apache.activemq.broker.BrokerFactory;
|
|||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -51,13 +53,14 @@ import java.util.concurrent.TimeUnit;
|
|||
* @version $Revision$
|
||||
*/
|
||||
public class LoadTestBurnIn extends JmsTestSupport {
|
||||
private static final transient Log log = LogFactory.getLog(LoadTestBurnIn.class);
|
||||
|
||||
public static Test suite() {
|
||||
return suite(LoadTestBurnIn.class);
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
System.out.println("Start: "+getName());
|
||||
log.info("Start: "+getName());
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
|
@ -67,7 +70,7 @@ public class LoadTestBurnIn extends JmsTestSupport {
|
|||
} catch (Throwable e) {
|
||||
e.printStackTrace(System.out);
|
||||
} finally {
|
||||
System.out.println("End: "+getName());
|
||||
log.info("End: "+getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.activemq.command.Message;
|
|||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.command.SessionInfo;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -50,7 +52,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
* @version $Revision: 1.9 $
|
||||
*/
|
||||
public class BrokerBenchmark extends BrokerTestSupport {
|
||||
|
||||
private static final transient Log log = LogFactory.getLog(BrokerBenchmark.class);
|
||||
|
||||
public int PRODUCE_COUNT=Integer.parseInt(System.getProperty("PRODUCE_COUNT","10000"));
|
||||
public ActiveMQDestination destination;
|
||||
public int PRODUCER_COUNT;
|
||||
|
@ -79,7 +82,7 @@ public class BrokerBenchmark extends BrokerTestSupport {
|
|||
|
||||
public void testPerformance() throws Exception {
|
||||
|
||||
System.out.println("Running Benchmark for destination="+destination+", producers="+PRODUCER_COUNT+", consumers="+CONSUMER_COUNT+", deliveryMode="+deliveryMode);
|
||||
log.info("Running Benchmark for destination="+destination+", producers="+PRODUCER_COUNT+", consumers="+CONSUMER_COUNT+", deliveryMode="+deliveryMode);
|
||||
final int CONSUME_COUNT = destination.isTopic() ? CONSUMER_COUNT*PRODUCE_COUNT : PRODUCE_COUNT;
|
||||
|
||||
final Semaphore consumersStarted = new Semaphore(1-(CONSUMER_COUNT));
|
||||
|
@ -138,7 +141,7 @@ public class BrokerBenchmark extends BrokerTestSupport {
|
|||
if(msg!=null) {
|
||||
connection.send(createAck(consumerInfo, msg, counter, MessageAck.STANDARD_ACK_TYPE));
|
||||
} else if ( receiveCounter.get() < CONSUME_COUNT ) {
|
||||
System.out.println("Consumer stall, waiting for message #"+receiveCounter.get()+1);
|
||||
log.info("Consumer stall, waiting for message #"+receiveCounter.get()+1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -192,9 +195,9 @@ public class BrokerBenchmark extends BrokerTestSupport {
|
|||
consumersFinished.acquire();
|
||||
long end2 = System.currentTimeMillis();
|
||||
|
||||
System.out.println("Results for destination="+destination+", producers="+PRODUCER_COUNT+", consumers="+CONSUMER_COUNT+", deliveryMode="+deliveryMode);
|
||||
System.out.println("Produced at messages/sec: "+ (PRODUCE_COUNT*1000.0/(end1-start)));
|
||||
System.out.println("Consumed at messages/sec: "+ (CONSUME_COUNT*1000.0/(end2-start)));
|
||||
log.info("Results for destination="+destination+", producers="+PRODUCER_COUNT+", consumers="+CONSUMER_COUNT+", deliveryMode="+deliveryMode);
|
||||
log.info("Produced at messages/sec: "+ (PRODUCE_COUNT*1000.0/(end1-start)));
|
||||
log.info("Consumed at messages/sec: "+ (CONSUME_COUNT*1000.0/(end2-start)));
|
||||
profilerPause("Benchmark done. Stop profiler ");
|
||||
}
|
||||
|
||||
|
|
|
@ -30,14 +30,15 @@ import org.apache.activemq.broker.BrokerService;
|
|||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
|
||||
import org.apache.activemq.test.JmsTopicSendReceiveTest;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* @version $Revision: 1.5 $
|
||||
*/
|
||||
public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest{
|
||||
private static final transient Log log = LogFactory.getLog(JmsDurableTopicSlowReceiveTest.class);
|
||||
|
||||
private static final org.apache.commons.logging.Log log=org.apache.commons.logging.LogFactory
|
||||
.getLog(JmsDurableTopicSlowReceiveTest.class);
|
||||
protected Connection connection2;
|
||||
protected Session session2;
|
||||
protected Session consumeSession2;
|
||||
|
@ -125,7 +126,7 @@ public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest{
|
|||
producer2.send(consumerDestination2,message);
|
||||
Thread.sleep(50);
|
||||
if(verbose){
|
||||
System.out.println("Sent("+loop+"): "+i);
|
||||
log.debug("Sent("+loop+"): "+i);
|
||||
}
|
||||
count++;
|
||||
}
|
||||
|
@ -158,7 +159,7 @@ public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest{
|
|||
if(msg==null)
|
||||
break;
|
||||
if(verbose) {
|
||||
System.out.println("Received("+loop+"): "+i + " count = " + msg.getIntProperty(countProperyName));
|
||||
log.debug("Received("+loop+"): "+i + " count = " + msg.getIntProperty(countProperyName));
|
||||
}
|
||||
assertNotNull(msg);
|
||||
assertEquals(msg.getJMSType(),"test");
|
||||
|
|
|
@ -29,13 +29,16 @@ import javax.jms.MessageNotReadableException;
|
|||
import javax.jms.MessageNotWriteableException;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQMapMessage;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
/**
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class ActiveMQMapMessageTest extends TestCase {
|
||||
private static final transient Log log = LogFactory.getLog(ActiveMQMapMessageTest.class);
|
||||
|
||||
private String name = "testName";
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
@ -251,7 +254,7 @@ public class ActiveMQMapMessageTest extends TestCase {
|
|||
msg.setObject("short", shortValue);
|
||||
msg.setObject("string", stringValue);
|
||||
} catch (MessageFormatException mfe) {
|
||||
System.out.println("Caught: " + mfe);
|
||||
log.warn("Caught: " + mfe);
|
||||
mfe.printStackTrace();
|
||||
fail("object formats should be correct");
|
||||
}
|
||||
|
|
|
@ -23,7 +23,8 @@ import java.util.Collections;
|
|||
|
||||
import org.apache.activemq.kaha.impl.async.Location;
|
||||
import org.apache.activemq.kaha.impl.async.JournalFacade.RecordLocationFacade;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
/**
|
||||
|
@ -32,6 +33,7 @@ import junit.framework.TestCase;
|
|||
* @version $Revision: 1.1 $
|
||||
*/
|
||||
public class LocationTest extends TestCase {
|
||||
private static final transient Log log = LogFactory.getLog(LocationTest.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
synchronized public void testRecordLocationImplComparison() throws IOException {
|
||||
|
@ -54,7 +56,7 @@ public class LocationTest extends TestCase {
|
|||
Collections.sort(l);
|
||||
|
||||
// Did they get sorted to the correct order?
|
||||
System.out.println(l.get(0));
|
||||
log.debug(l.get(0));
|
||||
assertSame( l.get(0).getLocation(), l1 );
|
||||
assertSame( l.get(1).getLocation(), l2 );
|
||||
assertSame( l.get(2).getLocation(), l3 );
|
||||
|
|
|
@ -25,7 +25,8 @@ import java.util.Iterator;
|
|||
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Test network reconnects over SSH tunnels. This case can be especially tricky since the SSH tunnels
|
||||
|
@ -34,8 +35,9 @@ import org.apache.activemq.broker.BrokerService;
|
|||
* @author chirino
|
||||
*/
|
||||
public class SSHTunnelNetworkReconnectTest extends NetworkReconnectTest {
|
||||
private static final transient Log log = LogFactory.getLog(SSHTunnelNetworkReconnectTest.class);
|
||||
|
||||
ArrayList processes = new ArrayList();
|
||||
ArrayList processes = new ArrayList();
|
||||
|
||||
|
||||
protected BrokerService createFirstBroker() throws Exception {
|
||||
|
|
|
@ -30,10 +30,15 @@ import org.apache.activemq.broker.BrokerFactory;
|
|||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
|
||||
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class InactiveDurableTopicTest extends TestCase{
|
||||
private static final transient Log log = LogFactory.getLog(InactiveDurableTopicTest.class);
|
||||
|
||||
private static final int MESSAGE_COUNT = 100000;
|
||||
private static final String DEFAULT_PASSWORD="";
|
||||
private static final String USERNAME="testuser";
|
||||
|
@ -125,7 +130,7 @@ public class InactiveDurableTopicTest extends TestCase{
|
|||
msg.setInt("key2",loop);
|
||||
publisher.send(msg,deliveryMode,deliveryPriority,Message.DEFAULT_TIME_TO_LIVE);
|
||||
if (loop%500==0){
|
||||
System.out.println("Sent " + loop + " messages");
|
||||
log.debug("Sent " + loop + " messages");
|
||||
}
|
||||
}
|
||||
this.assertEquals(loop,MESSAGE_COUNT);
|
||||
|
@ -159,7 +164,7 @@ public class InactiveDurableTopicTest extends TestCase{
|
|||
for(loop=0;loop<MESSAGE_COUNT;loop++){
|
||||
Message msg = subscriber.receive();
|
||||
if (loop%500==0){
|
||||
System.out.println("Received " + loop + " messages");
|
||||
log.debug("Received " + loop + " messages");
|
||||
}
|
||||
}
|
||||
this.assertEquals(loop,MESSAGE_COUNT);
|
||||
|
|
|
@ -31,10 +31,15 @@ import org.apache.activemq.broker.BrokerFactory;
|
|||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
|
||||
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* @version $Revision: 454471 $
|
||||
*/
|
||||
public class InactiveQueueTest extends TestCase{
|
||||
private static final transient Log log = LogFactory.getLog(InactiveQueueTest.class);
|
||||
|
||||
private static final int MESSAGE_COUNT = 0;
|
||||
private static final String DEFAULT_PASSWORD="";
|
||||
private static final String USERNAME="testuser";
|
||||
|
@ -95,7 +100,7 @@ public class InactiveQueueTest extends TestCase{
|
|||
msg.setInt("key2",loop);
|
||||
publisher.send(msg,deliveryMode,deliveryPriority,Message.DEFAULT_TIME_TO_LIVE);
|
||||
if (loop%500==0){
|
||||
System.out.println("Sent " + loop + " messages");
|
||||
log.debug("Sent " + loop + " messages");
|
||||
}
|
||||
}
|
||||
Thread.sleep(1000000);
|
||||
|
|
|
@ -24,10 +24,14 @@ import javax.jms.MessageConsumer;
|
|||
import javax.jms.Session;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* @version $Revision: 1.3 $
|
||||
*/
|
||||
public class QueueConnectionMemoryTest extends SimpleQueueTest{
|
||||
private static final transient Log log = LogFactory.getLog(QueueConnectionMemoryTest.class);
|
||||
|
||||
protected void setUp() throws Exception{
|
||||
}
|
||||
|
@ -68,7 +72,7 @@ public class QueueConnectionMemoryTest extends SimpleQueueTest{
|
|||
Session s=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
|
||||
Destination dest = s.createTemporaryQueue();
|
||||
MessageConsumer consumer=s.createConsumer(dest);
|
||||
System.out.println("Created connnection: " + i);
|
||||
log.debug("Created connnection: " + i);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,8 @@ import javax.jms.Session;
|
|||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.jms.core.MessageCreator;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -38,7 +40,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
public class RollbacksWhileConsumingLargeQueueTest extends
|
||||
EmbeddedBrokerTestSupport implements MessageListener {
|
||||
|
||||
protected int numberOfMessagesOnQueue = 6500;
|
||||
private static final transient Log log = LogFactory.getLog(RollbacksWhileConsumingLargeQueueTest.class);
|
||||
|
||||
protected int numberOfMessagesOnQueue = 6500;
|
||||
private Connection connection;
|
||||
private AtomicInteger deliveryCounter = new AtomicInteger(0);
|
||||
private AtomicInteger ackCounter = new AtomicInteger(0);
|
||||
|
@ -88,7 +92,7 @@ public class RollbacksWhileConsumingLargeQueueTest extends
|
|||
}
|
||||
|
||||
if (latch.await(1, TimeUnit.SECONDS)) {
|
||||
System.out.println("Received: " + deliveryCounter.get()
|
||||
log.debug("Received: " + deliveryCounter.get()
|
||||
+ " message(s)");
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -52,7 +52,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
public class ReconnectTest extends TestCase {
|
||||
|
||||
protected static final Log log = LogFactory.getLog(ReconnectTest.class);
|
||||
public static final int MESSAGES_PER_ITTERATION = 10;
|
||||
|
||||
public static final int MESSAGES_PER_ITTERATION = 10;
|
||||
public static final int WORKER_COUNT = 10;
|
||||
private BrokerService bs;
|
||||
private URI tcpUri;
|
||||
|
@ -158,20 +159,20 @@ public class ReconnectTest extends TestCase {
|
|||
|
||||
for( int k=1; k < 5; k++ ) {
|
||||
|
||||
System.out.println("Test run: "+k);
|
||||
log.info("Test run: "+k);
|
||||
|
||||
// Wait for at least one iteration to occur...
|
||||
for (int i=0; i < WORKER_COUNT; i++) {
|
||||
for( int j=0; workers[i].iterations.get() == 0 && j < 5; j++ ) {
|
||||
workers[i].assertNoErrors();
|
||||
System.out.println("Waiting for worker "+i+" to finish an iteration.");
|
||||
log.info("Waiting for worker "+i+" to finish an iteration.");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
assertTrue("Worker "+i+" never completed an interation.", workers[i].iterations.get()!=0);
|
||||
workers[i].assertNoErrors();
|
||||
}
|
||||
|
||||
System.out.println("Simulating transport error to cause reconnect.");
|
||||
log.info("Simulating transport error to cause reconnect.");
|
||||
|
||||
// Simulate a transport failure.
|
||||
for (int i=0; i < WORKER_COUNT; i++) {
|
||||
|
@ -180,12 +181,12 @@ public class ReconnectTest extends TestCase {
|
|||
|
||||
// Wait for the connections to get interrupted...
|
||||
while ( interruptedCount.get() < WORKER_COUNT ) {
|
||||
System.out.println("Waiting for connections to get interrupted.. at: "+interruptedCount.get());
|
||||
log.info("Waiting for connections to get interrupted.. at: "+interruptedCount.get());
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
// let things stablize..
|
||||
System.out.println("Pausing before starting next iterations...");
|
||||
log.info("Pausing before starting next iterations...");
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Reset the counters..
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.activemq.transport.tcp;
|
|||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
@ -27,6 +29,8 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
public class SslTransportFactoryTest extends TestCase {
|
||||
private static final transient Log log = LogFactory.getLog(SslTransportFactoryTest.class);
|
||||
|
||||
private SslTransportFactory factory;
|
||||
private boolean verbose;
|
||||
|
||||
|
@ -113,11 +117,11 @@ public class SslTransportFactoryTest extends TestCase {
|
|||
}
|
||||
|
||||
if (verbose) {
|
||||
System.out.println();
|
||||
System.out.println("Iteration: " + i);
|
||||
System.out.println("Map settings: " + options);
|
||||
log.info();
|
||||
log.info("Iteration: " + i);
|
||||
log.info("Map settings: " + options);
|
||||
for (int x = 0; x < optionSettings.length; x++) {
|
||||
System.out.println("optionSetting[" + x + "] = " + optionSettings[x]);
|
||||
log.info("optionSetting[" + x + "] = " + optionSettings[x]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -132,7 +136,7 @@ public class SslTransportFactoryTest extends TestCase {
|
|||
}
|
||||
|
||||
if (socketStub.getWantClientAuthStatus() != optionSettings[2]) {
|
||||
System.out.println("sheiite");
|
||||
log.info("sheiite");
|
||||
}
|
||||
|
||||
assertEquals("wantClientAuth was not properly set for iteration: " + i,
|
||||
|
|
|
@ -14,29 +14,9 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.Session;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
|
@ -46,304 +26,303 @@ import org.apache.activemq.memory.UsageManager;
|
|||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||
import org.apache.activemq.network.NetworkConnector;
|
||||
import org.apache.activemq.pool.PooledConnectionFactory;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.jms.core.JmsTemplate;
|
||||
import org.springframework.jms.core.MessageCreator;
|
||||
import org.springframework.jms.listener.DefaultMessageListenerContainer;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.Session;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class AMQDeadlockTestW4Brokers extends TestCase {
|
||||
private static final transient Log log = LogFactory.getLog(AMQDeadlockTestW4Brokers.class);
|
||||
private static final String BROKER_URL1 = "tcp://localhost:61616";
|
||||
private static final String BROKER_URL2 = "tcp://localhost:61617";
|
||||
private static final String BROKER_URL3 = "tcp://localhost:61618";
|
||||
private static final String BROKER_URL4 = "tcp://localhost:61619";
|
||||
private static final String URL1 = "tcp://localhost:61616?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
|
||||
private static final String URL2 = "tcp://localhost:61617?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
|
||||
private static final String URL3 = "tcp://localhost:61618?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
|
||||
private static final String URL4 = "tcp://localhost:61619?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
|
||||
private static final String QUEUE1_NAME = "test.queue.1";
|
||||
private static final int MAX_CONSUMERS = 5;
|
||||
private static final int NUM_MESSAGE_TO_SEND = 10000;
|
||||
private static final CountDownLatch latch = new CountDownLatch(MAX_CONSUMERS * NUM_MESSAGE_TO_SEND);
|
||||
|
||||
private static final String BROKER_URL1 = "tcp://localhost:61616";
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
|
||||
private static final String BROKER_URL2 = "tcp://localhost:61617";
|
||||
}
|
||||
|
||||
private static final String BROKER_URL3 = "tcp://localhost:61618";
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
|
||||
private static final String BROKER_URL4 = "tcp://localhost:61619";
|
||||
}
|
||||
|
||||
private static final String URL1 = "tcp://localhost:61616?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
|
||||
public void test4BrokerWithOutLingo() throws Exception {
|
||||
|
||||
private static final String URL2 = "tcp://localhost:61617?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
|
||||
BrokerService brokerService1 = null;
|
||||
BrokerService brokerService2 = null;
|
||||
BrokerService brokerService3 = null;
|
||||
BrokerService brokerService4 = null;
|
||||
ActiveMQConnectionFactory acf1 = null;
|
||||
ActiveMQConnectionFactory acf2 = null;
|
||||
PooledConnectionFactory pcf1 = null;
|
||||
PooledConnectionFactory pcf2 = null;
|
||||
ActiveMQConnectionFactory acf3 = null;
|
||||
ActiveMQConnectionFactory acf4 = null;
|
||||
PooledConnectionFactory pcf3 = null;
|
||||
PooledConnectionFactory pcf4 = null;
|
||||
DefaultMessageListenerContainer container1 = null;
|
||||
|
||||
private static final String URL3 = "tcp://localhost:61618?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
|
||||
try {
|
||||
|
||||
private static final String URL4 = "tcp://localhost:61619?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
|
||||
//Test with and without queue limits.
|
||||
brokerService1 = createBrokerService("broker1", BROKER_URL1,
|
||||
BROKER_URL2, BROKER_URL3, BROKER_URL4, 0 /* 10000000 */);
|
||||
brokerService1.start();
|
||||
brokerService2 = createBrokerService("broker2", BROKER_URL2,
|
||||
BROKER_URL1, BROKER_URL3, BROKER_URL4, 0/* 40000000 */);
|
||||
brokerService2.start();
|
||||
brokerService3 = createBrokerService("broker3", BROKER_URL3,
|
||||
BROKER_URL2, BROKER_URL1, BROKER_URL4, 0/* 10000000 */);
|
||||
brokerService3.start();
|
||||
brokerService4 = createBrokerService("broker4", BROKER_URL4,
|
||||
BROKER_URL1, BROKER_URL3, BROKER_URL2, 0/* 10000000 */);
|
||||
brokerService4.start();
|
||||
|
||||
private static final String QUEUE1_NAME = "test.queue.1";
|
||||
final String failover1 = "failover:("
|
||||
+ URL1
|
||||
+ ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
|
||||
final String failover2 = "failover:("
|
||||
+ URL2
|
||||
+ ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
|
||||
|
||||
private static final int MAX_CONSUMERS = 5;
|
||||
final String failover3 = "failover:("
|
||||
+ URL3
|
||||
+ ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
|
||||
|
||||
private static final int NUM_MESSAGE_TO_SEND = 10000;
|
||||
private static final CountDownLatch latch = new CountDownLatch(MAX_CONSUMERS*NUM_MESSAGE_TO_SEND);
|
||||
final String failover4 = "failover:("
|
||||
+ URL4
|
||||
+ ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
acf1 = createConnectionFactory(failover1);
|
||||
acf2 = createConnectionFactory(failover2);
|
||||
acf3 = createConnectionFactory(failover3);
|
||||
acf4 = createConnectionFactory(failover4);
|
||||
|
||||
}
|
||||
pcf1 = new PooledConnectionFactory(acf1);
|
||||
pcf2 = new PooledConnectionFactory(acf2);
|
||||
pcf3 = new PooledConnectionFactory(acf3);
|
||||
pcf4 = new PooledConnectionFactory(acf4);
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
container1 = createDefaultMessageListenerContainer(acf2,
|
||||
new TestMessageListener1(0), QUEUE1_NAME);
|
||||
container1.afterPropertiesSet();
|
||||
|
||||
}
|
||||
final PooledProducerTask[] task = new PooledProducerTask[4];
|
||||
task[0] = new PooledProducerTask(pcf1, QUEUE1_NAME, "producer1");
|
||||
task[1] = new PooledProducerTask(pcf2, QUEUE1_NAME, "producer2");
|
||||
task[2] = new PooledProducerTask(pcf3, QUEUE1_NAME, "producer3");
|
||||
task[3] = new PooledProducerTask(pcf4, QUEUE1_NAME, "producer4");
|
||||
|
||||
public void test4BrokerWithOutLingo() throws Exception {
|
||||
final ExecutorService executor = Executors.newCachedThreadPool();
|
||||
|
||||
BrokerService brokerService1 = null;
|
||||
BrokerService brokerService2 = null;
|
||||
BrokerService brokerService3 = null;
|
||||
BrokerService brokerService4 = null;
|
||||
ActiveMQConnectionFactory acf1 = null;
|
||||
ActiveMQConnectionFactory acf2 = null;
|
||||
PooledConnectionFactory pcf1 = null;
|
||||
PooledConnectionFactory pcf2 = null;
|
||||
ActiveMQConnectionFactory acf3 = null;
|
||||
ActiveMQConnectionFactory acf4 = null;
|
||||
PooledConnectionFactory pcf3 = null;
|
||||
PooledConnectionFactory pcf4 = null;
|
||||
DefaultMessageListenerContainer container1 = null;
|
||||
for (int i = 0; i < 4; i++) {
|
||||
executor.submit(task[i]);
|
||||
}
|
||||
|
||||
try {
|
||||
latch.await(15, TimeUnit.SECONDS);
|
||||
assertTrue(latch.getCount() == MAX_CONSUMERS * NUM_MESSAGE_TO_SEND);
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
finally {
|
||||
|
||||
//Test with and without queue limits.
|
||||
brokerService1 = createBrokerService("broker1", BROKER_URL1,
|
||||
BROKER_URL2, BROKER_URL3, BROKER_URL4, 0 /* 10000000 */);
|
||||
brokerService1.start();
|
||||
brokerService2 = createBrokerService("broker2", BROKER_URL2,
|
||||
BROKER_URL1, BROKER_URL3, BROKER_URL4, 0/* 40000000 */);
|
||||
brokerService2.start();
|
||||
brokerService3 = createBrokerService("broker3", BROKER_URL3,
|
||||
BROKER_URL2, BROKER_URL1, BROKER_URL4, 0/* 10000000 */);
|
||||
brokerService3.start();
|
||||
brokerService4 = createBrokerService("broker4", BROKER_URL4,
|
||||
BROKER_URL1, BROKER_URL3, BROKER_URL2, 0/* 10000000 */);
|
||||
brokerService4.start();
|
||||
container1.stop();
|
||||
container1.destroy();
|
||||
container1 = null;
|
||||
|
||||
final String failover1 = "failover:("
|
||||
+ URL1
|
||||
+ ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
|
||||
final String failover2 = "failover:("
|
||||
+ URL2
|
||||
+ ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
|
||||
brokerService1.stop();
|
||||
brokerService1 = null;
|
||||
brokerService2.stop();
|
||||
brokerService2 = null;
|
||||
brokerService3.stop();
|
||||
brokerService3 = null;
|
||||
brokerService4.stop();
|
||||
brokerService4 = null;
|
||||
}
|
||||
}
|
||||
|
||||
final String failover3 = "failover:("
|
||||
+ URL3
|
||||
+ ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
|
||||
private BrokerService createBrokerService(final String brokerName,
|
||||
final String uri1, final String uri2, final String uri3,
|
||||
final String uri4, final int queueLimit) throws Exception {
|
||||
final BrokerService brokerService = new BrokerService();
|
||||
|
||||
final String failover4 = "failover:("
|
||||
+ URL4
|
||||
+ ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
|
||||
brokerService.setBrokerName(brokerName);
|
||||
brokerService.setPersistent(false);
|
||||
brokerService.setUseJmx(true);
|
||||
|
||||
acf1 = createConnectionFactory(failover1);
|
||||
acf2 = createConnectionFactory(failover2);
|
||||
acf3 = createConnectionFactory(failover3);
|
||||
acf4 = createConnectionFactory(failover4);
|
||||
final UsageManager memoryManager = new UsageManager();
|
||||
memoryManager.setLimit(100000000);
|
||||
brokerService.setMemoryManager(memoryManager);
|
||||
|
||||
pcf1 = new PooledConnectionFactory(acf1);
|
||||
pcf2 = new PooledConnectionFactory(acf2);
|
||||
pcf3 = new PooledConnectionFactory(acf3);
|
||||
pcf4 = new PooledConnectionFactory(acf4);
|
||||
final ArrayList<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
|
||||
|
||||
final PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue(">");
|
||||
entry.setMemoryLimit(queueLimit);
|
||||
policyEntries.add(entry);
|
||||
|
||||
container1 = createDefaultMessageListenerContainer(acf2,
|
||||
new TestMessageListener1(0), QUEUE1_NAME);
|
||||
container1.afterPropertiesSet();
|
||||
final PolicyMap policyMap = new PolicyMap();
|
||||
policyMap.setPolicyEntries(policyEntries);
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
final PooledProducerTask[] task = new PooledProducerTask[4];
|
||||
task[0] = new PooledProducerTask(pcf1, QUEUE1_NAME, "producer1");
|
||||
task[1] = new PooledProducerTask(pcf2, QUEUE1_NAME, "producer2");
|
||||
task[2] = new PooledProducerTask(pcf3, QUEUE1_NAME, "producer3");
|
||||
task[3] = new PooledProducerTask(pcf4, QUEUE1_NAME, "producer4");
|
||||
final TransportConnector tConnector = new TransportConnector();
|
||||
tConnector.setUri(new URI(uri1));
|
||||
tConnector.setBrokerName(brokerName);
|
||||
tConnector.setName(brokerName + ".transportConnector");
|
||||
brokerService.addConnector(tConnector);
|
||||
|
||||
final ExecutorService executor = Executors.newCachedThreadPool();
|
||||
if (uri2 != null) {
|
||||
final NetworkConnector nc = new DiscoveryNetworkConnector(new URI(
|
||||
"static:" + uri2 + "," + uri3 + "," + uri4));
|
||||
nc.setBridgeTempDestinations(true);
|
||||
nc.setBrokerName(brokerName);
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
executor.submit(task[i]);
|
||||
}
|
||||
// When using queue limits set this to 1
|
||||
nc.setPrefetchSize(1000);
|
||||
nc.setNetworkTTL(1);
|
||||
brokerService.addNetworkConnector(nc);
|
||||
}
|
||||
|
||||
latch.await(15,TimeUnit.SECONDS);
|
||||
assertTrue(latch.getCount()==MAX_CONSUMERS*NUM_MESSAGE_TO_SEND);
|
||||
return brokerService;
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
|
||||
final ConnectionFactory acf, final MessageListener listener,
|
||||
final String queue) {
|
||||
final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
|
||||
container.setConnectionFactory(acf);
|
||||
container.setDestinationName(queue);
|
||||
container.setMessageListener(listener);
|
||||
container.setSessionTransacted(false);
|
||||
container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
|
||||
container.setConcurrentConsumers(MAX_CONSUMERS);
|
||||
return container;
|
||||
}
|
||||
|
||||
container1.stop();
|
||||
container1.destroy();
|
||||
container1 = null;
|
||||
public ActiveMQConnectionFactory createConnectionFactory(final String url) {
|
||||
final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
|
||||
acf.setCopyMessageOnSend(false);
|
||||
acf.setUseAsyncSend(false);
|
||||
acf.setDispatchAsync(true);
|
||||
acf.setUseCompression(false);
|
||||
acf.setOptimizeAcknowledge(false);
|
||||
acf.setOptimizedMessageDispatch(true);
|
||||
acf.setUseAsyncSend(false);
|
||||
|
||||
brokerService1.stop();
|
||||
brokerService1 = null;
|
||||
brokerService2.stop();
|
||||
brokerService2 = null;
|
||||
brokerService3.stop();
|
||||
brokerService3 = null;
|
||||
brokerService4.stop();
|
||||
brokerService4 = null;
|
||||
}
|
||||
return acf;
|
||||
}
|
||||
|
||||
}
|
||||
private class TestMessageListener1 implements MessageListener {
|
||||
private final long waitTime;
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
private BrokerService createBrokerService(final String brokerName,
|
||||
final String uri1, final String uri2, final String uri3,
|
||||
final String uri4, final int queueLimit) throws Exception {
|
||||
final BrokerService brokerService = new BrokerService();
|
||||
public TestMessageListener1(long waitTime) {
|
||||
this.waitTime = waitTime;
|
||||
}
|
||||
|
||||
brokerService.setBrokerName(brokerName);
|
||||
brokerService.setPersistent(false);
|
||||
brokerService.setUseJmx(true);
|
||||
public void onMessage(Message msg) {
|
||||
|
||||
final UsageManager memoryManager = new UsageManager();
|
||||
memoryManager.setLimit(100000000);
|
||||
brokerService.setMemoryManager(memoryManager);
|
||||
try {
|
||||
/*log.info("Listener1 Consumed message "
|
||||
+ msg.getIntProperty("count") + " from "
|
||||
+ msg.getStringProperty("producerName"));*/
|
||||
int value = count.incrementAndGet();
|
||||
if (value % 1000 == 0) {
|
||||
log.info("Consumed message: " + value);
|
||||
}
|
||||
|
||||
final ArrayList<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
|
||||
|
||||
final PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue(">");
|
||||
entry.setMemoryLimit(queueLimit);
|
||||
policyEntries.add(entry);
|
||||
|
||||
final PolicyMap policyMap = new PolicyMap();
|
||||
policyMap.setPolicyEntries(policyEntries);
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
final TransportConnector tConnector = new TransportConnector();
|
||||
tConnector.setUri(new URI(uri1));
|
||||
tConnector.setBrokerName(brokerName);
|
||||
tConnector.setName(brokerName + ".transportConnector");
|
||||
brokerService.addConnector(tConnector);
|
||||
|
||||
if (uri2 != null) {
|
||||
final NetworkConnector nc = new DiscoveryNetworkConnector(new URI(
|
||||
"static:" + uri2 + "," + uri3 + "," + uri4));
|
||||
nc.setBridgeTempDestinations(true);
|
||||
nc.setBrokerName(brokerName);
|
||||
|
||||
// When using queue limits set this to 1
|
||||
nc.setPrefetchSize(1000);
|
||||
nc.setNetworkTTL(1);
|
||||
brokerService.addNetworkConnector(nc);
|
||||
}
|
||||
|
||||
return brokerService;
|
||||
|
||||
}
|
||||
|
||||
public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
|
||||
final ConnectionFactory acf, final MessageListener listener,
|
||||
final String queue) {
|
||||
final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
|
||||
container.setConnectionFactory(acf);
|
||||
container.setDestinationName(queue);
|
||||
container.setMessageListener(listener);
|
||||
container.setSessionTransacted(false);
|
||||
container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
|
||||
container.setConcurrentConsumers(MAX_CONSUMERS);
|
||||
return container;
|
||||
}
|
||||
|
||||
public ActiveMQConnectionFactory createConnectionFactory(final String url) {
|
||||
final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
|
||||
acf.setCopyMessageOnSend(false);
|
||||
acf.setUseAsyncSend(false);
|
||||
acf.setDispatchAsync(true);
|
||||
acf.setUseCompression(false);
|
||||
acf.setOptimizeAcknowledge(false);
|
||||
acf.setOptimizedMessageDispatch(true);
|
||||
acf.setUseAsyncSend(false);
|
||||
|
||||
return acf;
|
||||
}
|
||||
|
||||
private class TestMessageListener1 implements MessageListener {
|
||||
|
||||
private final long waitTime;
|
||||
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
public TestMessageListener1(long waitTime) {
|
||||
this.waitTime = waitTime;
|
||||
|
||||
}
|
||||
|
||||
public void onMessage(Message msg) {
|
||||
|
||||
try {
|
||||
/*System.out.println("Listener1 Consumed message "
|
||||
+ msg.getIntProperty("count") + " from "
|
||||
+ msg.getStringProperty("producerName"));*/
|
||||
int value = count.incrementAndGet();
|
||||
if (value%1000==0){
|
||||
System.out.println("Consumed message: " + value);
|
||||
}
|
||||
|
||||
Thread.sleep(waitTime);
|
||||
Thread.sleep(waitTime);
|
||||
latch.countDown();
|
||||
/*} catch (JMSException e) {
|
||||
e.printStackTrace();*/
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
/*} catch (JMSException e) {
|
||||
e.printStackTrace();*/
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
private class PooledProducerTask implements Runnable {
|
||||
private final String queueName;
|
||||
private final PooledConnectionFactory pcf;
|
||||
private final String producerName;
|
||||
|
||||
private class PooledProducerTask implements Runnable {
|
||||
public PooledProducerTask(final PooledConnectionFactory pcf,
|
||||
final String queueName, final String producerName) {
|
||||
this.pcf = pcf;
|
||||
this.queueName = queueName;
|
||||
this.producerName = producerName;
|
||||
}
|
||||
|
||||
private final String queueName;
|
||||
public void run() {
|
||||
|
||||
private final PooledConnectionFactory pcf;
|
||||
try {
|
||||
|
||||
private final String producerName;
|
||||
final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
|
||||
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
jmsTemplate.setExplicitQosEnabled(true);
|
||||
jmsTemplate.setMessageIdEnabled(false);
|
||||
jmsTemplate.setMessageTimestampEnabled(false);
|
||||
jmsTemplate.afterPropertiesSet();
|
||||
|
||||
public PooledProducerTask(final PooledConnectionFactory pcf,
|
||||
final String queueName, final String producerName) {
|
||||
this.pcf = pcf;
|
||||
this.queueName = queueName;
|
||||
this.producerName = producerName;
|
||||
}
|
||||
final byte[] bytes = new byte[2048];
|
||||
final Random r = new Random();
|
||||
r.nextBytes(bytes);
|
||||
|
||||
public void run() {
|
||||
for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
|
||||
final int count = i;
|
||||
jmsTemplate.send(queueName, new MessageCreator() {
|
||||
public Message createMessage(Session session)
|
||||
throws JMSException {
|
||||
|
||||
try {
|
||||
final BytesMessage message = session
|
||||
.createBytesMessage();
|
||||
|
||||
final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
|
||||
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
jmsTemplate.setExplicitQosEnabled(true);
|
||||
jmsTemplate.setMessageIdEnabled(false);
|
||||
jmsTemplate.setMessageTimestampEnabled(false);
|
||||
jmsTemplate.afterPropertiesSet();
|
||||
message.writeBytes(bytes);
|
||||
message.setIntProperty("count", count);
|
||||
message.setStringProperty("producerName",
|
||||
producerName);
|
||||
return message;
|
||||
}
|
||||
});
|
||||
|
||||
final byte[] bytes = new byte[2048];
|
||||
final Random r = new Random();
|
||||
r.nextBytes(bytes);
|
||||
|
||||
for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
|
||||
final int count = i;
|
||||
jmsTemplate.send(queueName, new MessageCreator() {
|
||||
|
||||
public Message createMessage(Session session)
|
||||
throws JMSException {
|
||||
|
||||
final BytesMessage message = session
|
||||
.createBytesMessage();
|
||||
|
||||
message.writeBytes(bytes);
|
||||
message.setIntProperty("count", count);
|
||||
message.setStringProperty("producerName",
|
||||
producerName);
|
||||
return message;
|
||||
}
|
||||
});
|
||||
|
||||
// System.out.println("PooledProducer " + producerName + " sent message: " + count);
|
||||
|
||||
// Thread.sleep(1000);
|
||||
}
|
||||
|
||||
} catch (final Throwable e) {
|
||||
System.err.println("Producer 1 is exiting.");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
// log.info("PooledProducer " + producerName + " sent message: " + count);
|
||||
|
||||
// Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
catch (final Throwable e) {
|
||||
System.err.println("Producer 1 is exiting.");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ import org.apache.activemq.EmbeddedBrokerTestSupport;
|
|||
import org.apache.activemq.EmbeddedBrokerAndConnectionTestSupport;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
|
@ -32,6 +34,7 @@ import java.util.Set;
|
|||
* @version $Revision: $
|
||||
*/
|
||||
public class NewConsumerCreatesDestinationTest extends EmbeddedBrokerAndConnectionTestSupport {
|
||||
private static final transient Log log = LogFactory.getLog(NewConsumerCreatesDestinationTest.class);
|
||||
|
||||
private ActiveMQQueue wildcard;
|
||||
|
||||
|
@ -41,8 +44,8 @@ public class NewConsumerCreatesDestinationTest extends EmbeddedBrokerAndConnecti
|
|||
String wildcardText = "org.*" + getDestinationString().substring("org.apache".length());
|
||||
wildcard = new ActiveMQQueue(wildcardText);
|
||||
|
||||
System.out.println("Using wildcard: " + wildcard);
|
||||
System.out.println("on destination: " + destination);
|
||||
log.info("Using wildcard: " + wildcard);
|
||||
log.info("on destination: " + destination);
|
||||
|
||||
assertDestinationCreated(destination, false);
|
||||
assertDestinationCreated(wildcard, false);
|
||||
|
|
|
@ -17,27 +17,27 @@
|
|||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.xbean.BrokerFactoryBean;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.core.io.Resource;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
*
|
||||
* Test Publish/Consume queue using the release activemq.xml configuration file
|
||||
*
|
||||
* @version $Revision: 1.2 $
|
||||
*/
|
||||
public class PublishOnQueueConsumedMessageUsingActivemqXMLTest extends PublishOnTopicConsumedMessageTest {
|
||||
private static final transient Log log = LogFactory.getLog(PublishOnQueueConsumedMessageUsingActivemqXMLTest.class);
|
||||
protected static final String JOURNAL_ROOT = "../data/";
|
||||
BrokerService broker;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
/**
|
||||
* Use the transportConnector uri configured on the activemq.xml
|
||||
*
|
||||
* @return ActiveMQConnectionFactory
|
||||
|
@ -47,47 +47,39 @@ public class PublishOnQueueConsumedMessageUsingActivemqXMLTest extends PublishOn
|
|||
return new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sets up a test where the producer and consumer have their own connection.
|
||||
*
|
||||
* @see junit.framework.TestCase#setUp()
|
||||
*/
|
||||
protected void setUp() throws Exception {
|
||||
;
|
||||
File journalFile = new File(JOURNAL_ROOT);
|
||||
recursiveDelete(journalFile);
|
||||
// Create broker from resource
|
||||
System.out.print("Creating broker... ");
|
||||
log.info("Creating broker... ");
|
||||
broker = createBroker("org/apache/activemq/usecases/activemq.xml");
|
||||
log.info("Success");
|
||||
super.setUp();
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* Stops the Broker
|
||||
* @see junit.framework.TestCase#tearDown()
|
||||
*/
|
||||
protected void tearDown() throws Exception {
|
||||
log.info("Closing Broker");
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
log.info("Broker closed...");
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* clean up the journal
|
||||
*/
|
||||
* Stops the Broker
|
||||
* @see junit.framework.TestCase#tearDown()
|
||||
*/
|
||||
protected void tearDown() throws Exception {
|
||||
log.info("Closing Broker");
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
log.info("Broker closed...");
|
||||
}
|
||||
|
||||
/*
|
||||
* clean up the journal
|
||||
*/
|
||||
|
||||
protected static void recursiveDelete(File file) {
|
||||
if( file.isDirectory() ) {
|
||||
if (file.isDirectory()) {
|
||||
File[] files = file.listFiles();
|
||||
for (int i = 0; i < files.length; i++) {
|
||||
recursiveDelete(files[i]);
|
||||
|
@ -108,7 +100,6 @@ public class PublishOnQueueConsumedMessageUsingActivemqXMLTest extends PublishOn
|
|||
|
||||
//assertTrue("Should have a broker!", broker != null);
|
||||
|
||||
|
||||
return broker;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,26 +17,27 @@
|
|||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.xbean.BrokerFactoryBean;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.core.io.Resource;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
*
|
||||
* Test Publish/Consume topic using the release activemq.xml configuration file
|
||||
*
|
||||
* @version $Revision: 1.2 $
|
||||
*/
|
||||
public class PublishOnTopicConsumerMessageUsingActivemqXMLTest extends PublishOnTopicConsumedMessageTest {
|
||||
private static final transient Log log = LogFactory.getLog(PublishOnTopicConsumerMessageUsingActivemqXMLTest.class);
|
||||
protected static final String JOURNAL_ROOT = "../data/";
|
||||
BrokerService broker;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
/**
|
||||
* Use the transportConnector uri configured on the activemq.xml
|
||||
*
|
||||
* @return ActiveMQConnectionFactory
|
||||
|
@ -46,50 +47,39 @@ public class PublishOnTopicConsumerMessageUsingActivemqXMLTest extends PublishOn
|
|||
return new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sets up a test where the producer and consumer have their own connection.
|
||||
*
|
||||
* @see junit.framework.TestCase#setUp()
|
||||
*/
|
||||
protected void setUp() throws Exception {
|
||||
;
|
||||
File journalFile = new File(JOURNAL_ROOT);
|
||||
recursiveDelete(journalFile);
|
||||
// Create broker from resource
|
||||
System.out.print("Creating broker... ");
|
||||
log.info("Creating broker... ");
|
||||
broker = createBroker("org/apache/activemq/usecases/activemq.xml");
|
||||
log.info("Success");
|
||||
super.setUp();
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* Stops the Broker
|
||||
* @see junit.framework.TestCase#tearDown()
|
||||
*/
|
||||
protected void tearDown() throws Exception {
|
||||
log.info("Closing Broker");
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
log.info("Broker closed...");
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* clean up the journal
|
||||
*/
|
||||
* Stops the Broker
|
||||
* @see junit.framework.TestCase#tearDown()
|
||||
*/
|
||||
protected void tearDown() throws Exception {
|
||||
log.info("Closing Broker");
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
log.info("Broker closed...");
|
||||
}
|
||||
|
||||
/*
|
||||
* clean up the journal
|
||||
*/
|
||||
|
||||
protected static void recursiveDelete(File file) {
|
||||
if( file.isDirectory() ) {
|
||||
if (file.isDirectory()) {
|
||||
File[] files = file.listFiles();
|
||||
for (int i = 0; i < files.length; i++) {
|
||||
recursiveDelete(files[i]);
|
||||
|
@ -110,7 +100,6 @@ public class PublishOnTopicConsumerMessageUsingActivemqXMLTest extends PublishOn
|
|||
|
||||
//assertTrue("Should have a broker!", broker != null);
|
||||
|
||||
|
||||
return broker;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue