Reduce test time, add timeouts etc.
This commit is contained in:
Timothy Bish 2015-02-16 18:50:24 -05:00
parent 94937e855a
commit 388c16d084
8 changed files with 131 additions and 92 deletions

View File

@ -31,11 +31,9 @@ import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.security.JaasDualAuthenticationPlugin; import org.apache.activemq.security.JaasDualAuthenticationPlugin;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -60,7 +58,7 @@ public class ConnectTest {
} }
} }
@Test @Test(timeout = 60000)
public void testStompConnectLeak() throws Exception { public void testStompConnectLeak() throws Exception {
brokerService.addConnector("stomp://0.0.0.0:0?transport.soLinger=0"); brokerService.addConnector("stomp://0.0.0.0:0?transport.soLinger=0");
@ -84,11 +82,12 @@ public class ConnectTest {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 0 == brokerService.getTransportConnectors().get(0).connectionCount(); return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
} }
})); }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(200)));
assertTrue("no exceptions", exceptions.isEmpty()); assertTrue("no exceptions", exceptions.isEmpty());
} }
@Test @Test(timeout = 60000)
public void testJaasDualStopWithOpenConnection() throws Exception { public void testJaasDualStopWithOpenConnection() throws Exception {
brokerService.setPlugins(new BrokerPlugin[]{new JaasDualAuthenticationPlugin()}); brokerService.setPlugins(new BrokerPlugin[]{new JaasDualAuthenticationPlugin()});
@ -120,7 +119,7 @@ public class ConnectTest {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 1 == brokerService.getTransportConnectors().get(0).connectionCount(); return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
} }
})); }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(200)));
assertTrue("connected on time", doneConnect.await(5, TimeUnit.SECONDS)); assertTrue("connected on time", doneConnect.await(5, TimeUnit.SECONDS));
brokerService.stop(); brokerService.stop();
@ -135,7 +134,7 @@ public class ConnectTest {
assertTrue("no exceptions", exceptions.isEmpty()); assertTrue("no exceptions", exceptions.isEmpty());
} }
@Test @Test(timeout = 60000)
public void testInactivityMonitor() throws Exception { public void testInactivityMonitor() throws Exception {
brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=1000,0&transport.useKeepAlive=false"); brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=1000,0&transport.useKeepAlive=false");
@ -159,11 +158,11 @@ public class ConnectTest {
t1.start(); t1.start();
assertTrue("one connection", Wait.waitFor(new Wait.Condition() { assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 1 == brokerService.getTransportConnectors().get(0).connectionCount(); return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
} }
})); }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(200)));
// and it should be closed due to inactivity // and it should be closed due to inactivity
assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() { assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
@ -171,7 +170,8 @@ public class ConnectTest {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 0 == brokerService.getTransportConnectors().get(0).connectionCount(); return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
} }
})); }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(200)));
assertTrue("no exceptions", exceptions.isEmpty()); assertTrue("no exceptions", exceptions.isEmpty());
} }
} }

View File

@ -38,6 +38,7 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.Wait;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -69,7 +70,7 @@ public class Stomp11Test extends StompTestSupport {
port = connector.getConnectUri().getPort(); port = connector.getConnectUri().getPort();
} }
@Test @Test(timeout = 60000)
public void testConnect() throws Exception { public void testConnect() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -93,7 +94,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testConnectedNeverEncoded() throws Exception { public void testConnectedNeverEncoded() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -123,7 +124,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testConnectWithVersionOptions() throws Exception { public void testConnectWithVersionOptions() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -145,7 +146,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testConnectWithValidFallback() throws Exception { public void testConnectWithValidFallback() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -167,7 +168,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testConnectWithInvalidFallback() throws Exception { public void testConnectWithInvalidFallback() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -186,7 +187,7 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(f.indexOf("message:") >= 0); assertTrue(f.indexOf("message:") >= 0);
} }
@Test @Test(timeout = 60000)
public void testHeartbeats() throws Exception { public void testHeartbeats() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -230,7 +231,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testHeartbeatsDropsIdleConnection() throws Exception { public void testHeartbeatsDropsIdleConnection() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -262,7 +263,7 @@ public class Stomp11Test extends StompTestSupport {
assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000); assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000);
} }
@Test @Test(timeout = 60000)
public void testHeartbeatsKeepsConnectionOpen() throws Exception { public void testHeartbeatsKeepsConnectionOpen() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -312,7 +313,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testSendAfterMissingHeartbeat() throws Exception { public void testSendAfterMissingHeartbeat() throws Exception {
String connectFrame = "STOMP\n" + "login:system\n" + String connectFrame = "STOMP\n" + "login:system\n" +
@ -330,7 +331,13 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(f.indexOf("session:") >= 0); assertTrue(f.indexOf("session:") >= 0);
LOG.debug("Broker sent: " + f); LOG.debug("Broker sent: " + f);
Thread.sleep(TimeUnit.SECONDS.toMillis(10)); Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
try { try {
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
@ -343,7 +350,7 @@ public class Stomp11Test extends StompTestSupport {
} }
} }
@Test @Test(timeout = 60000)
public void testRejectInvalidHeartbeats1() throws Exception { public void testRejectInvalidHeartbeats1() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -363,7 +370,7 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(f.indexOf("message:") >= 0); assertTrue(f.indexOf("message:") >= 0);
} }
@Test @Test(timeout = 60000)
public void testRejectInvalidHeartbeats2() throws Exception { public void testRejectInvalidHeartbeats2() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -383,7 +390,7 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(f.indexOf("message:") >= 0); assertTrue(f.indexOf("message:") >= 0);
} }
@Test @Test(timeout = 60000)
public void testRejectInvalidHeartbeats3() throws Exception { public void testRejectInvalidHeartbeats3() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -403,7 +410,7 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(f.indexOf("message:") >= 0); assertTrue(f.indexOf("message:") >= 0);
} }
@Test @Test(timeout = 60000)
public void testSubscribeAndUnsubscribe() throws Exception { public void testSubscribeAndUnsubscribe() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -431,15 +438,16 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(stompFrame.getAction().equals("MESSAGE")); assertTrue(stompFrame.getAction().equals("MESSAGE"));
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL; "receipt:1\n" + "id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
Thread.sleep(4000); stompFrame = stompConnection.receive();
assertTrue(stompFrame.getAction().equals("RECEIPT"));
stompConnection.sendFrame(message); stompConnection.sendFrame(message);
try { try {
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame(2000);
LOG.info("Received frame: " + frame); LOG.info("Received frame: " + frame);
fail("No message should have been received since subscription was removed"); fail("No message should have been received since subscription was removed");
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
@ -449,7 +457,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testSubscribeWithNoId() throws Exception { public void testSubscribeWithNoId() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -476,7 +484,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testUnsubscribeWithNoId() throws Exception { public void testUnsubscribeWithNoId() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -493,10 +501,11 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(f.startsWith("CONNECTED")); assertTrue(f.startsWith("CONNECTED"));
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" + "ack:auto\n\n" + Stomp.NULL; "receipt:1\n" + "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
Thread.sleep(2000); frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("RECEIPT"));
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "\n\n" + Stomp.NULL; frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -508,7 +517,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testAckMessageWithId() throws Exception { public void testAckMessageWithId() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -547,7 +556,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testAckMessageWithNoId() throws Exception { public void testAckMessageWithNoId() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -592,7 +601,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testSubscribeWithWildcardSubscription() throws Exception { public void testSubscribeWithWildcardSubscription() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
"login:system\n" + "login:system\n" +
@ -630,7 +639,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testQueueBrowerSubscription() throws Exception { public void testQueueBrowerSubscription() throws Exception {
final int MSG_COUNT = 10; final int MSG_COUNT = 10;
@ -677,10 +686,11 @@ public class Stomp11Test extends StompTestSupport {
assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null); assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null);
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL; "receipt:1\n" + "id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub); stompConnection.sendFrame(unsub);
Thread.sleep(2000); String receipt = stompConnection.receiveFrame();
assertTrue(receipt.contains("RECEIPT"));
subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL; subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe); stompConnection.sendFrame(subscribe);
@ -697,7 +707,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testSendMessageWithStandardHeadersEncoded() throws Exception { public void testSendMessageWithStandardHeadersEncoded() throws Exception {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
@ -731,7 +741,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testSendMessageWithRepeatedEntries() throws Exception { public void testSendMessageWithRepeatedEntries() throws Exception {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
@ -761,7 +771,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testSubscribeWithMessageSentWithEncodedProperties() throws Exception { public void testSubscribeWithMessageSentWithEncodedProperties() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" + "accept-version:1.1" + "\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" + "accept-version:1.1" + "\n\n" + Stomp.NULL;
@ -790,7 +800,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testNackMessage() throws Exception { public void testNackMessage() throws Exception {
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -846,7 +856,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testHeaderValuesAreNotWSTrimmed() throws Exception { public void testHeaderValuesAreNotWSTrimmed() throws Exception {
stompConnection.setVersion(Stomp.V1_1); stompConnection.setVersion(Stomp.V1_1);
String connectFrame = "STOMP\n" + String connectFrame = "STOMP\n" +
@ -889,7 +899,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testDurableSubAndUnSubOnTwoTopics() throws Exception { public void testDurableSubAndUnSubOnTwoTopics() throws Exception {
stompConnection.setVersion(Stomp.V1_1); stompConnection.setVersion(Stomp.V1_1);
@ -937,9 +947,13 @@ public class Stomp11Test extends StompTestSupport {
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
try { Wait.waitFor(new Wait.Condition() {
Thread.sleep(400);
} catch (InterruptedException e){} @Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
// reconnect and send some messages to the offline subscribers and then try to get // reconnect and send some messages to the offline subscribers and then try to get
// them after subscribing again. // them after subscribing again.
@ -979,7 +993,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testDurableSubAndUnSubFlow() throws Exception { public void testDurableSubAndUnSubFlow() throws Exception {
stompConnection.setVersion(Stomp.V1_1); stompConnection.setVersion(Stomp.V1_1);
@ -1055,14 +1069,11 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test @Test(timeout = 60000)
public void testMultipleDurableSubsWithOfflineMessages() throws Exception { public void testMultipleDurableSubsWithOfflineMessages() throws Exception {
stompConnection.setVersion(Stomp.V1_1); stompConnection.setVersion(Stomp.V1_1);
String domain = "org.apache.activemq"; final BrokerViewMBean view = getProxyToBroker();
ObjectName brokerName = new ObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean view = (BrokerViewMBean)brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
String connectFrame = "STOMP\n" + "login:system\n" + "passcode:manager\n" + String connectFrame = "STOMP\n" + "login:system\n" + "passcode:manager\n" +
"accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL; "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
@ -1102,9 +1113,13 @@ public class Stomp11Test extends StompTestSupport {
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
try { assertTrue(Wait.waitFor(new Wait.Condition() {
Thread.sleep(400);
} catch (InterruptedException e){} @Override
public boolean isSatisified() throws Exception {
return view.getCurrentConnectionsCount() == 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
// reconnect and send some messages to the offline subscribers and then try to get // reconnect and send some messages to the offline subscribers and then try to get
// them after subscribing again. // them after subscribing again.
@ -1112,7 +1127,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(connectFrame); stompConnection.sendFrame(connectFrame);
frame = stompConnection.receiveFrame(); frame = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + frame); LOG.debug("Broker sent: " + frame);
assertTrue(frame.startsWith("CONNECTED")); assertTrue(frame.contains("CONNECTED"));
assertEquals(view.getDurableTopicSubscribers().length, 0); assertEquals(view.getDurableTopicSubscribers().length, 0);
assertEquals(view.getInactiveDurableTopicSubscribers().length, 2); assertEquals(view.getInactiveDurableTopicSubscribers().length, 2);

View File

@ -21,10 +21,12 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.Wait;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -218,9 +220,14 @@ public class Stomp12Test extends StompTestSupport {
frame = "DISCONNECT\n\n" + Stomp.NULL; frame = "DISCONNECT\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
try {
Thread.sleep(400); assertTrue(Wait.waitFor(new Wait.Condition() {
} catch (InterruptedException e){}
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
// reconnect and send some messages to the offline subscribers and then try to get // reconnect and send some messages to the offline subscribers and then try to get
// them after subscribing again. // them after subscribing again.
@ -404,10 +411,11 @@ public class Stomp12Test extends StompTestSupport {
assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null); assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null);
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL; "receipt:1" + "id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub); stompConnection.sendFrame(unsub);
Thread.sleep(2000); StompFrame stompFrame = stompConnection.receive();
assertTrue(stompFrame.getAction().equals("RECEIPT"));
subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL; subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe); stompConnection.sendFrame(subscribe);

View File

@ -20,6 +20,7 @@ package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination; import javax.jms.Destination;
@ -50,9 +51,11 @@ public class StompPrefetchTest extends StompTestSupport {
brokerService.setDestinationPolicy(pMap); brokerService.setDestinationPolicy(pMap);
brokerService.setAdvisorySupport(true); brokerService.setAdvisorySupport(true);
brokerService.setUseJmx(false);
brokerService.setPersistent(false);
} }
@Test @Test(timeout = 60000)
public void testTopicSubPrefetch() throws Exception { public void testTopicSubPrefetch() throws Exception {
stompConnection.connect("system", "manager"); stompConnection.connect("system", "manager");
@ -61,7 +64,7 @@ public class StompPrefetchTest extends StompTestSupport {
verifyPrefetch(10, new ActiveMQTopic("T")); verifyPrefetch(10, new ActiveMQTopic("T"));
} }
@Test @Test(timeout = 60000)
public void testDurableSubPrefetch() throws Exception { public void testDurableSubPrefetch() throws Exception {
stompConnection.connect("system", "manager"); stompConnection.connect("system", "manager");
HashMap<String,String> headers = new HashMap<String, String>(); HashMap<String,String> headers = new HashMap<String, String>();
@ -71,7 +74,7 @@ public class StompPrefetchTest extends StompTestSupport {
verifyPrefetch(10, new ActiveMQTopic("T")); verifyPrefetch(10, new ActiveMQTopic("T"));
} }
@Test @Test(timeout = 60000)
public void testQBrowserSubPrefetch() throws Exception { public void testQBrowserSubPrefetch() throws Exception {
HashMap<String,String> headers = new HashMap<String, String>(); HashMap<String,String> headers = new HashMap<String, String>();
headers.put("login","system"); headers.put("login","system");
@ -86,7 +89,7 @@ public class StompPrefetchTest extends StompTestSupport {
verifyPrefetch(10, new ActiveMQQueue("Q")); verifyPrefetch(10, new ActiveMQQueue("Q"));
} }
@Test @Test(timeout = 60000)
public void testQueueSubPrefetch() throws Exception { public void testQueueSubPrefetch() throws Exception {
stompConnection.connect("system", "manager"); stompConnection.connect("system", "manager");
stompConnection.subscribe("/queue/Q", Stomp.Headers.Subscribe.AckModeValues.AUTO); stompConnection.subscribe("/queue/Q", Stomp.Headers.Subscribe.AckModeValues.AUTO);
@ -107,7 +110,6 @@ public class StompPrefetchTest extends StompTestSupport {
} }
return false; return false;
} }
})); }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
} }
} }

View File

@ -602,7 +602,7 @@ public class StompTest extends StompTestSupport {
} }
// sleep a while before publishing another set of messages // sleep a while before publishing another set of messages
TimeUnit.SECONDS.sleep(1); TimeUnit.MILLISECONDS.sleep(500);
for (int i = 0; i < ctr; ++i) { for (int i = 0; i < ctr; ++i) {
data[i] = getName() + ":second:" + i; data[i] = getName() + ":second:" + i;
@ -781,7 +781,7 @@ public class StompTest extends StompTestSupport {
LOG.info("queueView, enqueue:" + queueView.getEnqueueCount() +", dequeue:" + queueView.getDequeueCount() + ", inflight:" + queueView.getInFlightCount()); LOG.info("queueView, enqueue:" + queueView.getEnqueueCount() +", dequeue:" + queueView.getDequeueCount() + ", inflight:" + queueView.getInFlightCount());
return queueView.getDequeueCount() == 1; return queueView.getDequeueCount() == 1;
} }
})); }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -1533,9 +1533,13 @@ public class StompTest extends StompTestSupport {
// disconnect // disconnect
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
try { Wait.waitFor(new Wait.Condition() {
Thread.sleep(1000);
} catch (InterruptedException e){} @Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
//reconnect //reconnect
stompConnect(); stompConnect();
@ -1556,7 +1560,7 @@ public class StompTest extends StompTestSupport {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return view.getDurableTopicSubscribers().length == 0 && view.getInactiveDurableTopicSubscribers().length == 0; return view.getDurableTopicSubscribers().length == 0 && view.getInactiveDurableTopicSubscribers().length == 0;
} }
}); }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
assertEquals(view.getDurableTopicSubscribers().length, 0); assertEquals(view.getDurableTopicSubscribers().length, 0);
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0); assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
@ -2183,7 +2187,7 @@ public class StompTest extends StompTestSupport {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return brokerService.getBroker().getClients().length == expected; return brokerService.getBroker().getClients().length == expected;
} }
}); }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100));
org.apache.activemq.broker.Connection[] clients = brokerService.getBroker().getClients(); org.apache.activemq.broker.Connection[] clients = brokerService.getBroker().getClients();
int actual = clients.length; int actual = clients.length;

View File

@ -50,9 +50,13 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StompTestSupport { public class StompTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(StompTestSupport.class);
protected final AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {}; protected final AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
protected BrokerService brokerService; protected BrokerService brokerService;
protected int port; protected int port;
@ -90,6 +94,7 @@ public class StompTestSupport {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
LOG.info("========== start " + getName() + " ==========");
autoFailTestSupport.startAutoFailThread(); autoFailTestSupport.startAutoFailThread();
startBroker(); startBroker();
stompConnect(); stompConnect();
@ -97,6 +102,7 @@ public class StompTestSupport {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
LOG.info("========== finished " + getName() + " ==========");
autoFailTestSupport.stopAutoFailThread(); autoFailTestSupport.stopAutoFailThread();
try { try {
stompDisconnect(); stompDisconnect();
@ -167,6 +173,8 @@ public class StompTestSupport {
brokerService.setSchedulerSupport(true); brokerService.setSchedulerSupport(true);
brokerService.setPopulateJMSXUserID(true); brokerService.setPopulateJMSXUserID(true);
brokerService.setSchedulerSupport(true); brokerService.setSchedulerSupport(true);
brokerService.getManagementContext().setCreateConnector(false);
brokerService.getManagementContext().setCreateMBeanServer(false);
JobSchedulerStoreImpl jobStore = new JobSchedulerStoreImpl(); JobSchedulerStoreImpl jobStore = new JobSchedulerStoreImpl();
jobStore.setDirectory(new File("activemq-data")); jobStore.setDirectory(new File("activemq-data"));

View File

@ -66,7 +66,7 @@ public class StompTimeStampingBrokerPluginTest {
broker = new BrokerService(); broker = new BrokerService();
broker.setPersistent(false); broker.setPersistent(false);
broker.setUseJmx(true); broker.setUseJmx(false);
broker.setPlugins(new BrokerPlugin[] {tsbp}); broker.setPlugins(new BrokerPlugin[] {tsbp});
connectionUri = broker.addConnector("tcp://0.0.0.0:0").getPublishableConnectString(); connectionUri = broker.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
String stompConnectionUri = broker.addConnector("stomp://0.0.0.0:0").getPublishableConnectString(); String stompConnectionUri = broker.addConnector("stomp://0.0.0.0:0").getPublishableConnectString();

View File

@ -52,6 +52,8 @@ public class StompVirtualTopicTest extends StompTestSupport {
protected void createBroker() throws Exception { protected void createBroker() throws Exception {
brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost")); brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost"));
brokerService.setUseJmx(true); brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);
brokerService.getManagementContext().setCreateMBeanServer(false);
brokerService.setDeleteAllMessagesOnStartup(true); brokerService.setDeleteAllMessagesOnStartup(true);
File testDataDir = new File("target/activemq-data/StompVirtualTopicTest"); File testDataDir = new File("target/activemq-data/StompVirtualTopicTest");
@ -82,7 +84,7 @@ public class StompVirtualTopicTest extends StompTestSupport {
brokerService.setDestinationPolicy(policyMap); brokerService.setDestinationPolicy(policyMap);
} }
@Test @Test(timeout = 60000)
public void testStompOnVirtualTopics() throws Exception { public void testStompOnVirtualTopics() throws Exception {
LOG.info("Running Stomp Producer"); LOG.info("Running Stomp Producer");
@ -97,7 +99,7 @@ public class StompVirtualTopicTest extends StompTestSupport {
StompFrame frame = stompConnection.receive(); StompFrame frame = stompConnection.receive();
assertTrue(frame.toString().startsWith("CONNECTED")); assertTrue(frame.toString().startsWith("CONNECTED"));
for (int i=0; i<NUM_MSGS-1; i++) { for (int i = 0; i < NUM_MSGS - 1; i++) {
stompConnection.send("/topic/VirtualTopic.FOO", "Hello World {" + (i + 1) + "}"); stompConnection.send("/topic/VirtualTopic.FOO", "Hello World {" + (i + 1) + "}");
} }
@ -112,19 +114,13 @@ public class StompVirtualTopicTest extends StompTestSupport {
msg = stompConnection.receiveFrame(); msg = stompConnection.receiveFrame();
assertTrue(msg.contains("RECEIPT")); assertTrue(msg.contains("RECEIPT"));
// Does the sleep resolve the problem?
try {
Thread.sleep(6000);
} catch (java.lang.InterruptedException e) {
LOG.error(e.getMessage());
}
stompConnection.disconnect(); stompConnection.disconnect();
Thread.sleep(2000); Thread.sleep(1000);
stompConnection.close(); stompConnection.close();
LOG.info("Stomp Producer finished. Waiting for consumer to join."); LOG.info("Stomp Producer finished. Waiting for consumer to join.");
//wait for consumer to shut down // Wait for consumer to shut down
consumer.join(); consumer.join(45000);
LOG.info("Test finished."); LOG.info("Test finished.");
// check if consumer set failMsg, then let the test fail. // check if consumer set failMsg, then let the test fail.
@ -173,9 +169,15 @@ public class StompVirtualTopicTest extends StompTestSupport {
stompConnection.sendFrame("CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL); stompConnection.sendFrame("CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL);
StompFrame frame = stompConnection.receive(); StompFrame frame = stompConnection.receive();
assertTrue(frame.toString().startsWith("CONNECTED")); assertTrue(frame.toString().startsWith("CONNECTED"));
stompConnection.subscribe("/queue/Consumer.A.VirtualTopic.FOO", "auto");
Thread.sleep(2000); HashMap<String, String> headers = new HashMap<String, String>();
headers.put("receipt", "sub-1");
stompConnection.subscribe("/queue/Consumer.A.VirtualTopic.FOO", "auto", headers);
String receipt = stompConnection.receiveFrame();
assertTrue("Should have read a receipt for subscribe", receipt.contains("RECEIPT"));
assertTrue("Receipt contains receipt-id", receipt.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
latch.countDown(); latch.countDown();
for (counter=0; counter<StompVirtualTopicTest.NUM_MSGS; counter++) { for (counter=0; counter<StompVirtualTopicTest.NUM_MSGS; counter++) {
@ -228,7 +230,7 @@ public class StompVirtualTopicTest extends StompTestSupport {
} finally { } finally {
try { try {
stompConnection.disconnect(); stompConnection.disconnect();
Thread.sleep(2000); Thread.sleep(1000);
stompConnection.close(); stompConnection.close();
} catch (Exception e) { } catch (Exception e) {
log.error("unexpected exception on sleep", e); log.error("unexpected exception on sleep", e);