Change some of the amqp tests to be more event driven reducing the time by several minutes

This commit is contained in:
Daniel Kulp 2014-07-21 15:54:43 -04:00
parent 4de5219a8f
commit 7fe23bce62
3 changed files with 161 additions and 115 deletions

View File

@ -1,4 +1,4 @@
/** >>>>>> pumping
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -46,9 +46,11 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -282,6 +284,16 @@ public class JMSClientTest extends JMSClientTestSupport {
assertEquals("hello + 9", ((TextMessage) msg).getText());
}
}
abstract class Testable implements Runnable {
protected String msg;
synchronized boolean passed() {
if (msg != null) {
fail(msg);
}
return true;
}
}
@Test(timeout=30000)
public void testProducerThrowsWhenBrokerStops() throws Exception {
@ -291,63 +303,70 @@ public class JMSClientTest extends JMSClientTestSupport {
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
final MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = session.createTextMessage("Sample text");
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
final Message m = session.createTextMessage("Sample text");
Testable t = new Testable() {
public synchronized void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
for (int i = 0; i < 30; ++i) {
producer.send(m);
synchronized (producer) {
producer.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
msg = "Should have thrown an IllegalStateException";
} catch (Exception ex) {
LOG.info("Caught exception on send: {}", ex);
}
}
});
stopper.start();
try {
for (int i = 0; i < 10; ++i) {
producer.send(m);
TimeUnit.SECONDS.sleep(1);
}
fail("Should have thrown an IllegalStateException");
} catch (Exception ex) {
LOG.info("Caught exception on send: {}", ex);
};
synchronized(producer) {
new Thread(t).start();
//wait until we know that the producer was able to send a message
producer.wait(10000);
}
stopBroker();
assertTrue(t.passed());
}
@Test(timeout=30000)
public void testProducerCreateThrowsWhenBrokerStops() throws Exception {
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue(getDestinationName());
connection.start();
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
Testable t = new Testable() {
public synchronized void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
for (int i = 0; i < 10; ++i) {
MessageProducer producer = session.createProducer(queue);
synchronized (session) {
session.notifyAll();
}
if (producer == null) {
msg = "Producer should not be null";
}
TimeUnit.SECONDS.sleep(1);
}
msg = "Should have thrown an IllegalStateException";
} catch (Exception ex) {
LOG.info("Caught exception on create producer: {}", ex);
}
}
});
stopper.start();
try {
for (int i = 0; i < 10; ++i) {
MessageProducer producer = session.createProducer(queue);
assertNotNull(producer);
TimeUnit.SECONDS.sleep(1);
}
fail("Should have thrown an IllegalStateException");
} catch (Exception ex) {
LOG.info("Caught exception on create producer: {}", ex);
};
synchronized (session) {
new Thread(t).start();
session.wait(10000);
}
stopBroker();
assertTrue(t.passed());
}
@Test(timeout=30000)
@ -379,28 +398,30 @@ public class JMSClientTest extends JMSClientTestSupport {
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageConsumer consumer=session.createConsumer(queue);
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
final MessageConsumer consumer=session.createConsumer(queue);
Testable t = new Testable() {
public synchronized void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
for (int i = 0; i < 10; ++i) {
consumer.receiveNoWait();
synchronized (consumer) {
consumer.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(1000 + (i * 100));
}
msg = "Should have thrown an IllegalStateException";
} catch (Exception ex) {
LOG.info("Caught exception on receiveNoWait: {}", ex);
}
}
});
stopper.start();
try {
for (int i = 0; i < 10; ++i) {
consumer.receiveNoWait();
TimeUnit.MILLISECONDS.sleep(1000 + (i * 100));
}
fail("Should have thrown an IllegalStateException");
} catch (Exception ex) {
LOG.info("Caught exception on receiveNoWait: {}", ex);
};
synchronized (consumer) {
new Thread(t).start();
consumer.wait(10000);
}
stopBroker();
assertTrue(t.passed());
}
@Test(timeout=60000)
@ -410,27 +431,29 @@ public class JMSClientTest extends JMSClientTestSupport {
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageConsumer consumer=session.createConsumer(queue);
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
final MessageConsumer consumer=session.createConsumer(queue);
Testable t = new Testable() {
public synchronized void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
for (int i = 0; i < 10; ++i) {
consumer.receive(100 + (i * 1000));
synchronized (consumer) {
consumer.notifyAll();
}
}
msg = "Should have thrown an IllegalStateException";
} catch (Exception ex) {
LOG.info("Caught exception on receive(1000): {}", ex);
}
}
});
stopper.start();
try {
for (int i = 0; i < 10; ++i) {
consumer.receive(1000 + (i * 100));
}
fail("Should have thrown an IllegalStateException");
} catch (Exception ex) {
LOG.info("Caught exception on receive(1000): {}", ex);
};
synchronized (consumer) {
new Thread(t).start();
consumer.wait(10000);
consumer.notifyAll();
}
stopBroker();
assertTrue(t.passed());
}
@Test(timeout=30000)
@ -440,25 +463,35 @@ public class JMSClientTest extends JMSClientTestSupport {
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageConsumer consumer=session.createConsumer(queue);
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
final MessageConsumer consumer=session.createConsumer(queue);
Testable t = new Testable() {
public synchronized void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
Message m = consumer.receive(1);
synchronized (consumer) {
consumer.notifyAll();
if (m != null) {
msg = "Should have returned null";
return;
}
}
m = consumer.receive();
if (m != null) {
msg = "Should have returned null";
}
} catch (Exception ex) {
LOG.info("Caught exception on receive(1000): {}", ex);
}
}
});
stopper.start();
try {
Message m = consumer.receive();
assertNull(m);
} catch (Exception ex) {
LOG.info("Caught exception on receive(1000): {}", ex);
};
synchronized (consumer) {
new Thread(t).start();
consumer.wait(10000);
}
stopBroker();
assertTrue(t.passed());
}
@Test(timeout=30000)
@ -506,7 +539,7 @@ public class JMSClientTest extends JMSClientTestSupport {
assertEquals("Test-Message:" + i,((TextMessage) message).getText());
}
Message message = consumer.receive(5000);
Message message = consumer.receive(500);
assertNull(message);
}
@ -697,7 +730,7 @@ public class JMSClientTest extends JMSClientTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
final CountDownLatch called = new CountDownLatch(1);
@ -710,19 +743,11 @@ public class JMSClientTest extends JMSClientTestSupport {
called.countDown();
}
});
//This makes sure the connection is completely up and connected
s.createTemporaryQueue().delete();
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
}
});
stopper.start();
stopBroker();
assertTrue("No exception listener event fired.", called.await(15, TimeUnit.SECONDS));
}
@ -745,7 +770,7 @@ public class JMSClientTest extends JMSClientTestSupport {
// No commit in place, so no message should be dispatched.
MessageConsumer consumer = session.createConsumer(queue);
TextMessage m = (TextMessage) consumer.receive(5000);
TextMessage m = (TextMessage) consumer.receive(500);
assertNull(m);
@ -781,7 +806,7 @@ public class JMSClientTest extends JMSClientTestSupport {
// No commit in place, so no message should be dispatched.
MessageConsumer consumer = session.createConsumer(queue);
TextMessage m = (TextMessage) consumer.receive(5000);
TextMessage m = (TextMessage) consumer.receive(500);
assertNull(m);
session.close();

View File

@ -23,11 +23,14 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import org.apache.activemq.transport.amqp.DefaultTrustManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.objectweb.jtests.jms.conform.connection.ConnectionTest;
import org.objectweb.jtests.jms.conform.connection.TopicConnectionTest;
import org.objectweb.jtests.jms.conform.message.MessageBodyTest;
@ -76,14 +79,21 @@ public class JoramJmsNioPlusSslTest {
@Rule
public Timeout to = new Timeout(10 * 1000);
private static SSLContext def;
@BeforeClass
public static void beforeClass() throws Exception {
System.setProperty("joram.jms.test.file", getJmsTestFileName());
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, null);
def = SSLContext.getDefault();
SSLContext.setDefault(ctx);
}
@AfterClass
public static void afterClass() throws Exception {
System.clearProperty("joram.jms.test.file");
SSLContext.setDefault(def);
}
public static String getJmsTestFileName() {
return "providerNIOPlusSSL.properties";

View File

@ -23,11 +23,14 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import org.apache.activemq.transport.amqp.DefaultTrustManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.objectweb.jtests.jms.conform.connection.ConnectionTest;
import org.objectweb.jtests.jms.conform.message.MessageBodyTest;
import org.objectweb.jtests.jms.conform.message.MessageDefaultTest;
@ -75,15 +78,23 @@ public class JoramSslTest {
@Rule
public Timeout to = new Timeout(10 * 1000);
static SSLContext def;
@BeforeClass
public static void beforeClass() throws Exception {
System.setProperty("joram.jms.test.file", getJmsTestFileName());
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, null);
def = SSLContext.getDefault();
SSLContext.setDefault(ctx);
}
@AfterClass
public static void afterClass() throws Exception {
System.clearProperty("joram.jms.test.file");
SSLContext.setDefault(def);
}
public static String getJmsTestFileName() {
return "providerSSL.properties";