mirror of https://github.com/apache/activemq.git
Reduce the overall time to run the AMQP tests
This commit is contained in:
parent
8b36701fc3
commit
94937e855a
|
@ -36,11 +36,10 @@ import javax.jms.TextMessage;
|
|||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.spring.SpringSslContext;
|
||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.store.kahadb.KahaDBStore;
|
||||
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
|
||||
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
|
||||
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AMQ4563Test extends AmqpTestSupport {
|
||||
|
@ -57,27 +56,26 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
|
||||
Connection connection = createAMQConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue("txqueue");
|
||||
MessageProducer p = session.createProducer(queue);
|
||||
Queue queue = session.createQueue(name.getMethodName());
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
TextMessage message = null;
|
||||
for (int i=0; i < messagesSent; i++) {
|
||||
for (int i = 0; i < messagesSent; i++) {
|
||||
message = session.createTextMessage();
|
||||
String messageText = "Hello " + i + " sent at " + new java.util.Date().toString();
|
||||
message.setText(messageText);
|
||||
LOG.debug(">>>> Sent [{}]", messageText);
|
||||
p.send(message);
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
// After the first restart we should get all messages sent above
|
||||
QueueImpl qpidQueue = new QueueImpl("queue://txqueue");
|
||||
restartBroker(connection, session);
|
||||
int messagesReceived = readAllMessages(qpidQueue);
|
||||
int messagesReceived = readAllMessages(name.getMethodName());
|
||||
assertEquals(messagesSent, messagesReceived);
|
||||
|
||||
// This time there should be no messages on this queue
|
||||
restartBroker(connection, session);
|
||||
messagesReceived = readAllMessages(qpidQueue);
|
||||
assertEquals(0, messagesReceived);
|
||||
QueueViewMBean queueView = getProxyToQueue(name.getMethodName());
|
||||
assertEquals(0, queueView.getQueueSize());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
|
@ -87,7 +85,7 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
|
||||
Connection connection = createAMQPConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue("txqueue");
|
||||
Queue queue = session.createQueue(name.getMethodName());
|
||||
MessageProducer p = session.createProducer(queue);
|
||||
TextMessage message = session.createTextMessage();
|
||||
String messageText = "Hello sent at " + new java.util.Date().toString();
|
||||
|
@ -98,19 +96,18 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
restartBroker(connection, session);
|
||||
String selector = "JMSMessageID = '" + message.getJMSMessageID() + "'";
|
||||
LOG.info("Using selector: {}", selector);
|
||||
int messagesReceived = readAllMessages(queue, selector);
|
||||
int messagesReceived = readAllMessages(name.getMethodName(), selector);
|
||||
assertEquals(1, messagesReceived);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSelectingOnActiveMQMessageID() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
QueueImpl queue = new QueueImpl("queue://txqueue");
|
||||
assertTrue(brokerService.isPersistent());
|
||||
|
||||
Connection connection = createAMQConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createQueue("txqueue");
|
||||
Destination destination = session.createQueue(name.getMethodName());
|
||||
MessageProducer p = session.createProducer(destination);
|
||||
TextMessage message = session.createTextMessage();
|
||||
String messageText = "Hello sent at " + new java.util.Date().toString();
|
||||
|
@ -121,7 +118,7 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
restartBroker(connection, session);
|
||||
String selector = "JMSMessageID = '" + message.getJMSMessageID() + "'";
|
||||
LOG.info("Using selector: {}", selector);
|
||||
int messagesReceived = readAllMessages(queue, selector);
|
||||
int messagesReceived = readAllMessages(name.getMethodName(), selector);
|
||||
assertEquals(1, messagesReceived);
|
||||
}
|
||||
|
||||
|
@ -133,7 +130,7 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
|
||||
Connection connection = createAMQPConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue("txqueue");
|
||||
Queue queue = session.createQueue(name.getMethodName());
|
||||
MessageProducer p = session.createProducer(queue);
|
||||
TextMessage message = null;
|
||||
for (int i=0; i < messagesSent; i++) {
|
||||
|
@ -146,23 +143,24 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
|
||||
// After the first restart we should get all messages sent above
|
||||
restartBroker(connection, session);
|
||||
int messagesReceived = readAllMessages(queue);
|
||||
int messagesReceived = readAllMessages(name.getMethodName());
|
||||
assertEquals(messagesSent, messagesReceived);
|
||||
|
||||
// This time there should be no messages on this queue
|
||||
restartBroker(connection, session);
|
||||
messagesReceived = readAllMessages(queue);
|
||||
assertEquals(0, messagesReceived);
|
||||
QueueViewMBean queueView = getProxyToQueue(name.getMethodName());
|
||||
assertEquals(0, queueView.getQueueSize());
|
||||
}
|
||||
|
||||
private int readAllMessages(Queue queue) throws JMSException {
|
||||
return readAllMessages(queue, null);
|
||||
private int readAllMessages(String queueName) throws JMSException {
|
||||
return readAllMessages(queueName, null);
|
||||
}
|
||||
|
||||
private int readAllMessages(Queue queue, String selector) throws JMSException {
|
||||
private int readAllMessages(String queueName, String selector) throws JMSException {
|
||||
Connection connection = createAMQPConnection();
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
int messagesReceived = 0;
|
||||
MessageConsumer consumer;
|
||||
if( selector==null ) {
|
||||
|
@ -170,14 +168,25 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
} else {
|
||||
consumer = session.createConsumer(queue, selector);
|
||||
}
|
||||
Message msg = consumer.receive(5000);
|
||||
|
||||
try {
|
||||
// Try to get out quickly if there are no messages on the broker side
|
||||
QueueViewMBean queueView = getProxyToQueue(queue.getQueueName());
|
||||
if (queueView.getQueueSize() == 0) {
|
||||
return 0;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Error during destination check: {}", e);
|
||||
}
|
||||
|
||||
Message msg = consumer.receive(1000);
|
||||
while(msg != null) {
|
||||
assertNotNull(msg);
|
||||
assertTrue(msg instanceof TextMessage);
|
||||
TextMessage textMessage = (TextMessage) msg;
|
||||
LOG.debug(">>>> Received [{}]", textMessage.getText());
|
||||
messagesReceived++;
|
||||
msg = consumer.receive(5000);
|
||||
msg = consumer.receive(1000);
|
||||
}
|
||||
consumer.close();
|
||||
|
||||
|
@ -241,23 +250,25 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
brokerService.setPersistent(true);
|
||||
brokerService.setPersistenceAdapter(kaha);
|
||||
brokerService.setAdvisorySupport(false);
|
||||
brokerService.setUseJmx(false);
|
||||
brokerService.setUseJmx(true);
|
||||
brokerService.getManagementContext().setCreateMBeanServer(false);
|
||||
brokerService.setStoreOpenWireVersion(10);
|
||||
openwireUri = brokerService.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
|
||||
|
||||
// Setup SSL context...
|
||||
final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());
|
||||
File keystore = new File(classesDir, "../../src/test/resources/keystore");
|
||||
final SpringSslContext sslContext = new SpringSslContext();
|
||||
sslContext.setKeyStore(keystore.getCanonicalPath());
|
||||
sslContext.setKeyStorePassword("password");
|
||||
sslContext.setTrustStore(keystore.getCanonicalPath());
|
||||
sslContext.setTrustStorePassword("password");
|
||||
sslContext.afterPropertiesSet();
|
||||
brokerService.setSslContext(sslContext);
|
||||
// final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());
|
||||
// File keystore = new File(classesDir, "../../src/test/resources/keystore");
|
||||
// final SpringSslContext sslContext = new SpringSslContext();
|
||||
// sslContext.setKeyStore(keystore.getCanonicalPath());
|
||||
// sslContext.setKeyStorePassword("password");
|
||||
// sslContext.setTrustStore(keystore.getCanonicalPath());
|
||||
// sslContext.setTrustStorePassword("password");
|
||||
// sslContext.afterPropertiesSet();
|
||||
// brokerService.setSslContext(sslContext);
|
||||
|
||||
addAMQPConnector();
|
||||
brokerService.start();
|
||||
brokerService.waitUntilStarted();
|
||||
this.numberOfMessages = 2000;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,28 +35,24 @@ import javax.jms.Session;
|
|||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class AMQ4920Test extends AmqpTestSupport {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AMQ4920Test.class);
|
||||
private static final Integer ITERATIONS = 1 * 1000;
|
||||
private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are required to reproduce the original issue
|
||||
private static final Integer ITERATIONS = 500;
|
||||
private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are
|
||||
// required to reproduce
|
||||
// the original issue
|
||||
public static final String TEXT_MESSAGE = "TextMessage: ";
|
||||
private final CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
|
||||
private final CountDownLatch initLatch = new CountDownLatch(CONSUMER_COUNT);
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Test(timeout = 1 * 60 * 1000)
|
||||
@Test(timeout = 60000)
|
||||
public void testSendWithMultipleConsumers() throws Exception {
|
||||
ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
|
||||
ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
|
||||
connectionFactory.setSyncPublish(false);
|
||||
Connection connection = connectionFactory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
String destinationName = "topic://AMQ4920Test" + System.currentTimeMillis();
|
||||
|
@ -64,9 +60,8 @@ public class AMQ4920Test extends AmqpTestSupport {
|
|||
connection.start();
|
||||
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
for (int i=0; i < CONSUMER_COUNT; i++) {
|
||||
AMQ4930ConsumerTask consumerTask =
|
||||
new AMQ4930ConsumerTask(initLatch, destinationName, port, "Consumer-" + i, latch, ITERATIONS);
|
||||
for (int i = 0; i < CONSUMER_COUNT; i++) {
|
||||
AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(initLatch, destinationName, port, "Consumer-" + i, latch, ITERATIONS);
|
||||
executor.submit(consumerTask);
|
||||
}
|
||||
connection.start();
|
||||
|
@ -92,7 +87,7 @@ public class AMQ4920Test extends AmqpTestSupport {
|
|||
for (int i = 0; i < count; i++) {
|
||||
TextMessage message = session.createTextMessage();
|
||||
message.setText(TEXT_MESSAGE + i);
|
||||
LOG.debug("Sending message [" + i + "]");
|
||||
LOG.trace("Sending message [" + i + "]");
|
||||
producer.send(message);
|
||||
if (sleepInterval > 0) {
|
||||
Thread.sleep(sleepInterval);
|
||||
|
@ -112,7 +107,7 @@ class AMQ4930ConsumerTask implements Callable<Boolean> {
|
|||
private final int expectedMessageCount;
|
||||
private final CountDownLatch started;
|
||||
|
||||
public AMQ4930ConsumerTask (CountDownLatch started, String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
|
||||
public AMQ4930ConsumerTask(CountDownLatch started, String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
|
||||
this.started = started;
|
||||
this.destinationName = destinationName;
|
||||
this.port = port;
|
||||
|
@ -124,7 +119,7 @@ class AMQ4930ConsumerTask implements Callable<Boolean> {
|
|||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
LOG.debug(consumerName + " starting");
|
||||
Connection connection=null;
|
||||
Connection connection = null;
|
||||
try {
|
||||
ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
|
||||
connection = connectionFactory.createConnection();
|
||||
|
@ -136,8 +131,8 @@ class AMQ4930ConsumerTask implements Callable<Boolean> {
|
|||
started.countDown();
|
||||
|
||||
int receivedCount = 0;
|
||||
while(receivedCount < expectedMessageCount) {
|
||||
Message message = consumer.receive(5 * 1000);
|
||||
while (receivedCount < expectedMessageCount) {
|
||||
Message message = consumer.receive(2000);
|
||||
if (message == null) {
|
||||
LOG.error("consumer {} got null message on iteration {}", consumerName, receivedCount);
|
||||
return false;
|
||||
|
@ -151,8 +146,7 @@ class AMQ4930ConsumerTask implements Callable<Boolean> {
|
|||
LOG.error("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
|
||||
return false;
|
||||
}
|
||||
LOG.debug("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText()); // TODO make debug
|
||||
|
||||
LOG.trace("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
|
||||
messagesReceived.countDown();
|
||||
receivedCount++;
|
||||
}
|
||||
|
@ -168,4 +162,3 @@ class AMQ4930ConsumerTask implements Callable<Boolean> {
|
|||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.net.Socket;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
|
||||
|
@ -99,10 +100,10 @@ public class AmqpConnectTimeoutTest extends AmqpTestSupport {
|
|||
|
||||
@Override
|
||||
public String getAdditionalConfig() {
|
||||
return "&transport.connectAttemptTimeout=2000";
|
||||
return "&transport.connectAttemptTimeout=1200";
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
@Test(timeout = 30000)
|
||||
public void testInactivityMonitor() throws Exception {
|
||||
|
||||
Thread t1 = new Thread() {
|
||||
|
@ -127,7 +128,7 @@ public class AmqpConnectTimeoutTest extends AmqpTestSupport {
|
|||
public boolean isSatisified() throws Exception {
|
||||
return 1 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
|
||||
}
|
||||
}));
|
||||
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
|
||||
|
||||
// and it should be closed due to inactivity
|
||||
assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
|
||||
|
@ -135,7 +136,7 @@ public class AmqpConnectTimeoutTest extends AmqpTestSupport {
|
|||
public boolean isSatisified() throws Exception {
|
||||
return 0 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
|
||||
}
|
||||
}));
|
||||
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500)));
|
||||
|
||||
assertTrue("no exceptions", exceptions.isEmpty());
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ public class AmqpTestSupport {
|
|||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
LOG.info("========== start " + getTestName() + " ==========");
|
||||
exceptions.clear();
|
||||
if (killHungThreads("setUp")) {
|
||||
LOG.warn("HUNG THREADS in setUp");
|
||||
|
@ -101,9 +102,11 @@ public class AmqpTestSupport {
|
|||
protected void createBroker(boolean deleteAllMessages) throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
brokerService.setPersistent(false);
|
||||
brokerService.setSchedulerSupport(false);
|
||||
brokerService.setAdvisorySupport(false);
|
||||
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||
brokerService.setUseJmx(true);
|
||||
brokerService.getManagementContext().setCreateMBeanServer(false);
|
||||
|
||||
SSLContext ctx = SSLContext.getInstance("TLS");
|
||||
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
|
||||
|
@ -215,6 +218,7 @@ public class AmqpTestSupport {
|
|||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
LOG.info("========== tearDown " + getTestName() + " ==========");
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
Future<Boolean> future = executor.submit(new TearDownTask());
|
||||
try {
|
||||
|
@ -266,6 +270,10 @@ public class AmqpTestSupport {
|
|||
session.close();
|
||||
}
|
||||
|
||||
public String getTestName() {
|
||||
return name.getMethodName();
|
||||
}
|
||||
|
||||
protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
|
||||
ObjectName brokerViewMBean = new ObjectName(
|
||||
"org.apache.activemq:type=Broker,brokerName=localhost");
|
||||
|
|
|
@ -210,6 +210,7 @@ public class AmqpTransformerTest {
|
|||
brokerService = new BrokerService();
|
||||
brokerService.setPersistent(false);
|
||||
brokerService.setAdvisorySupport(false);
|
||||
brokerService.setUseJmx(false);
|
||||
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||
|
||||
TransportConnector connector = brokerService.addConnector(amqpUrl);
|
||||
|
|
|
@ -51,8 +51,6 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
|
|||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.objectweb.jtests.jms.framework.TestConfig;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -64,21 +62,6 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
|
||||
protected java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM");
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
LOG.debug("in setUp of {}", name.getMethodName());
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
LOG.debug("in tearDown of {}", name.getMethodName());
|
||||
super.tearDown();
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test(timeout=30000)
|
||||
public void testProducerConsume() throws Exception {
|
||||
|
@ -228,7 +211,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
for(int i = 1; i <= consumeBeforeRollback; i++) {
|
||||
for (int i = 1; i <= consumeBeforeRollback; i++) {
|
||||
Message message = consumer.receive(1000);
|
||||
assertNotNull(message);
|
||||
assertEquals("Unexpected message number", i, message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER));
|
||||
|
@ -240,11 +223,11 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
|
||||
// Consume again..check we receive all the messages.
|
||||
Set<Integer> messageNumbers = new HashSet<Integer>();
|
||||
for(int i = 1; i <= totalCount; i++) {
|
||||
for (int i = 1; i <= totalCount; i++) {
|
||||
messageNumbers.add(i);
|
||||
}
|
||||
|
||||
for(int i = 1; i <= totalCount; i++) {
|
||||
for (int i = 1; i <= totalCount; i++) {
|
||||
Message message = consumer.receive(1000);
|
||||
assertNotNull(message);
|
||||
int msgNum = message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER);
|
||||
|
@ -261,7 +244,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
|
||||
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
final int msgCount = 500;
|
||||
final int msgCount = 300;
|
||||
|
||||
connection = createConnection();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
@ -374,7 +357,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
};
|
||||
synchronized(producer) {
|
||||
new Thread(t).start();
|
||||
//wait until we know that the producer was able to send a message
|
||||
// wait until we know that the producer was able to send a message
|
||||
producer.wait(10000);
|
||||
}
|
||||
|
||||
|
@ -389,7 +372,6 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
final Queue queue = session.createQueue(getDestinationName());
|
||||
connection.start();
|
||||
|
||||
|
||||
Testable t = new Testable() {
|
||||
@Override
|
||||
public synchronized void run() {
|
||||
|
@ -570,8 +552,8 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
}
|
||||
|
||||
@Test(timeout=30 * 1000)
|
||||
public void testProduceAndConsumeLargeNumbersOfMessages() throws JMSException {
|
||||
int count = 2000;
|
||||
public void testProduceAndConsumeLargeNumbersOfMessages() throws Exception {
|
||||
int count = 1000;
|
||||
connection = createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(getDestinationName());
|
||||
|
@ -583,16 +565,14 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
producer.send(m);
|
||||
}
|
||||
|
||||
MessageConsumer consumer=session.createConsumer(queue);
|
||||
MessageConsumer consumer=session.createConsumer(queue);
|
||||
for(int i = 0; i < count; i++) {
|
||||
Message message = consumer.receive(5000);
|
||||
assertNotNull(message);
|
||||
System.out.println(((TextMessage) message).getText());
|
||||
assertEquals("Test-Message:" + i,((TextMessage) message).getText());
|
||||
}
|
||||
|
||||
Message message = consumer.receive(500);
|
||||
assertNull(message);
|
||||
assertNull(consumer.receiveNoWait());
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
|
@ -670,7 +650,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
msg.set(consumer.receiveNoWait());
|
||||
return msg.get() != null;
|
||||
}
|
||||
}));
|
||||
}, TimeUnit.SECONDS.toMillis(25), TimeUnit.MILLISECONDS.toMillis(200)));
|
||||
|
||||
assertNotNull("Should have received a message by now.", msg.get());
|
||||
assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
|
||||
|
@ -807,7 +787,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
public void testSessionTransactedCommit() throws JMSException, InterruptedException {
|
||||
public void testSessionTransactedCommit() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
|
||||
connection = createConnection();
|
||||
|
@ -824,24 +804,19 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
}
|
||||
|
||||
// No commit in place, so no message should be dispatched.
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
TextMessage m = (TextMessage) consumer.receive(500);
|
||||
|
||||
assertNull(m);
|
||||
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
|
||||
assertEquals(0, queueView.getQueueSize());
|
||||
|
||||
session.commit();
|
||||
|
||||
// Messages should be available now.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Message msg = consumer.receive(5000);
|
||||
assertNotNull(msg);
|
||||
}
|
||||
// No commit in place, so no message should be dispatched.
|
||||
assertEquals(10, queueView.getQueueSize());
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
public void testSessionTransactedRollback() throws JMSException, InterruptedException {
|
||||
public void testSessionTransactedRollback() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
|
||||
connection = createConnection();
|
||||
|
@ -859,10 +834,14 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
|
||||
session.rollback();
|
||||
|
||||
// No commit in place, so no message should be dispatched.
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
TextMessage m = (TextMessage) consumer.receive(500);
|
||||
assertNull(m);
|
||||
|
||||
// No commit in place, so no message should be dispatched.
|
||||
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
|
||||
assertEquals(0, queueView.getQueueSize());
|
||||
|
||||
assertNull(consumer.receive(100));
|
||||
consumer.close();
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
@ -922,7 +901,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
return broker.getInactiveDurableTopicSubscribers().length == 0 &&
|
||||
broker.getDurableTopicSubscribers().length == 1;
|
||||
}
|
||||
}));
|
||||
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
|
||||
|
||||
consumer.close();
|
||||
|
||||
|
@ -933,7 +912,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
return broker.getInactiveDurableTopicSubscribers().length == 1 &&
|
||||
broker.getDurableTopicSubscribers().length == 0;
|
||||
}
|
||||
}));
|
||||
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
|
||||
|
||||
session.unsubscribe("DurbaleTopic");
|
||||
assertTrue(Wait.waitFor(new Wait.Condition() {
|
||||
|
@ -943,7 +922,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
return broker.getInactiveDurableTopicSubscribers().length == 0 &&
|
||||
broker.getDurableTopicSubscribers().length == 0;
|
||||
}
|
||||
}));
|
||||
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
|
@ -964,7 +943,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
return broker.getInactiveDurableTopicSubscribers().length == 0 &&
|
||||
broker.getDurableTopicSubscribers().length == 0;
|
||||
}
|
||||
}));
|
||||
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
|
||||
|
||||
try {
|
||||
session.unsubscribe("DurbaleTopic");
|
||||
|
@ -993,7 +972,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
return broker.getInactiveDurableTopicSubscribers().length == 0 &&
|
||||
broker.getDurableTopicSubscribers().length == 1;
|
||||
}
|
||||
}));
|
||||
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
|
||||
|
||||
try {
|
||||
session.unsubscribe("DurbaleTopic");
|
||||
|
|
|
@ -67,8 +67,16 @@ public class JmsClientRequestResponseTest extends AmqpTestSupport implements Mes
|
|||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
requestorConnection.close();
|
||||
responderConnection.close();
|
||||
if (requestorConnection != null) {
|
||||
try {
|
||||
requestorConnection.close();
|
||||
} catch (Exception e) {}
|
||||
}
|
||||
if (responderConnection != null) {
|
||||
try {
|
||||
responderConnection.close();
|
||||
} catch (Exception e) {}
|
||||
}
|
||||
|
||||
if (syncThread != null) {
|
||||
syncThread.join(5000);
|
||||
|
|
|
@ -39,10 +39,10 @@ public class JmsMessageGroupsTest extends JMSClientTestSupport {
|
|||
private static final int ITERATIONS = 10;
|
||||
private static final int MESSAGE_COUNT = 10;
|
||||
private static final int MESSAGE_SIZE = 200 * 1024;
|
||||
private static final int RECEIVE_TIMEOUT = 5000;
|
||||
private static final int RECEIVE_TIMEOUT = 3000;
|
||||
private static final String JMSX_GROUP_ID = "JmsGroupsTest";
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
@Test(timeout = 60000)
|
||||
public void testGroupSeqIsNeverLost() throws Exception {
|
||||
AtomicInteger sequenceCounter = new AtomicInteger();
|
||||
|
||||
|
@ -64,11 +64,11 @@ public class JmsMessageGroupsTest extends JMSClientTestSupport {
|
|||
for (int i = 0; i < MESSAGE_COUNT; ++i) {
|
||||
Message message = consumer.receive(RECEIVE_TIMEOUT);
|
||||
assertNotNull(message);
|
||||
LOG.info("Read message #{}: type = {}", i, message.getClass().getSimpleName());
|
||||
LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName());
|
||||
String gid = message.getStringProperty("JMSXGroupID");
|
||||
String seq = message.getStringProperty("JMSXGroupSeq");
|
||||
LOG.info("Message assigned JMSXGroupID := {}", gid);
|
||||
LOG.info("Message assigned JMSXGroupSeq := {}", seq);
|
||||
LOG.debug("Message assigned JMSXGroupID := {}", gid);
|
||||
LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
|
||||
}
|
||||
|
||||
consumer.close();
|
||||
|
|
|
@ -77,7 +77,7 @@ public class SimpleAMQPAuthTest {
|
|||
}
|
||||
});
|
||||
connection.start();
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(500);
|
||||
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
fail("Expected JMSException");
|
||||
} catch (JMSException e) {
|
||||
|
|
|
@ -58,7 +58,7 @@ public class AMQ4914Test extends AmqpTestSupport {
|
|||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSendSmallerMessages() throws JMSException {
|
||||
for (int i = 512; i <= (16 * 1024); i += 512) {
|
||||
for (int i = 512; i <= (8 * 1024); i += 512) {
|
||||
doTestSendLargeMessage(i);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,7 +90,8 @@ public class ActiveMQAdmin implements Admin {
|
|||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
|
||||
return BrokerFactory.createBroker(new URI("broker://()/localhost" +
|
||||
"?persistent=false&useJmx=false&advisorySupport=false&schedulerSupport=false"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.InputStream;
|
|||
import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
|
@ -185,7 +186,7 @@ public class UnsupportedClientTest extends AmqpTestSupport {
|
|||
return true;
|
||||
}
|
||||
}
|
||||
}));
|
||||
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
|
||||
}
|
||||
|
||||
private ClientConnection createClientConnection(boolean ssl) {
|
||||
|
|
|
@ -20,8 +20,8 @@
|
|||
#
|
||||
log4j.rootLogger=WARN, console, file
|
||||
log4j.logger.org.apache.activemq=INFO
|
||||
log4j.logger.org.apache.activemq.transport.amqp=DEBUG
|
||||
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=DEBUG
|
||||
log4j.logger.org.apache.activemq.transport.amqp=INFO
|
||||
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
|
||||
log4j.logger.org.fusesource=INFO
|
||||
|
||||
# Console will only display warnnings
|
||||
|
|
|
@ -25,8 +25,7 @@
|
|||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
|
||||
|
||||
|
||||
<broker useJmx="true" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true" schedulePeriodForDestinationPurge="2000">
|
||||
<broker useJmx="false" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true">
|
||||
|
||||
<destinations>
|
||||
<queue physicalName="TEST.Q" />
|
||||
|
@ -43,42 +42,13 @@
|
|||
</transportConnectors>
|
||||
|
||||
<plugins>
|
||||
|
||||
<simpleAuthenticationPlugin>
|
||||
<users>
|
||||
<authenticationUser username="system" password="systemPassword" groups="users,admins"/>
|
||||
<authenticationUser username="user" password="userPassword" groups="users"/>
|
||||
<authenticationUser username="guest" password="guestPassword" groups="guests"/>
|
||||
</users>
|
||||
</simpleAuthenticationPlugin>
|
||||
|
||||
|
||||
|
||||
<!-- lets configure a destination based authorization mechanism -->
|
||||
<!--
|
||||
<authorizationPlugin>
|
||||
<map>
|
||||
<authorizationMap>
|
||||
<authorizationEntries>
|
||||
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
|
||||
<authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
|
||||
<authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
|
||||
|
||||
<authorizationEntry queue="TEST.Q" read="guests" write="guests" />
|
||||
|
||||
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
|
||||
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
|
||||
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
|
||||
|
||||
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
|
||||
</authorizationEntries>
|
||||
<tempDestinationAuthorizationEntry>
|
||||
<tempDestinationAuthorizationEntry read="admins" write="admins" admin="admins"/>
|
||||
</tempDestinationAuthorizationEntry>
|
||||
</authorizationMap>
|
||||
</map>
|
||||
</authorizationPlugin>
|
||||
-->
|
||||
<simpleAuthenticationPlugin>
|
||||
<users>
|
||||
<authenticationUser username="system" password="systemPassword" groups="users,admins"/>
|
||||
<authenticationUser username="user" password="userPassword" groups="users"/>
|
||||
<authenticationUser username="guest" password="guestPassword" groups="guests"/>
|
||||
</users>
|
||||
</simpleAuthenticationPlugin>
|
||||
</plugins>
|
||||
</broker>
|
||||
|
||||
|
|
|
@ -21,10 +21,10 @@ package org.apache.activemq.util;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class Wait {
|
||||
|
||||
|
||||
public static final long MAX_WAIT_MILLIS = 30*1000;
|
||||
public static final int SLEEP_MILLIS = 1000;
|
||||
|
||||
public static final long SLEEP_MILLIS = 1000;
|
||||
|
||||
public interface Condition {
|
||||
boolean isSatisified() throws Exception;
|
||||
}
|
||||
|
@ -37,14 +37,14 @@ public class Wait {
|
|||
return waitFor(condition, duration, SLEEP_MILLIS);
|
||||
}
|
||||
|
||||
public static boolean waitFor(final Condition condition, final long duration, final int sleepMillis) throws Exception {
|
||||
public static boolean waitFor(final Condition condition, final long duration, final long sleepMillis) throws Exception {
|
||||
|
||||
final long expiry = System.currentTimeMillis() + duration;
|
||||
boolean conditionSatisified = condition.isSatisified();
|
||||
while (!conditionSatisified && System.currentTimeMillis() < expiry) {
|
||||
TimeUnit.MILLISECONDS.sleep(sleepMillis);
|
||||
conditionSatisified = condition.isSatisified();
|
||||
}
|
||||
}
|
||||
return conditionSatisified;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue