NO-JIRA Improvements on StompStests
Replace some Wait clauses Use lower timeout when results are expected to be null
This commit is contained in:
parent
03f8f83935
commit
f122b5059f
|
@ -166,9 +166,7 @@ public class StompTest extends StompTestBase {
|
|||
}
|
||||
});
|
||||
|
||||
((ActiveMQServerImpl) server).getMonitor()
|
||||
.setMaxUsage(0)
|
||||
.tick();
|
||||
((ActiveMQServerImpl) server).getMonitor().setMaxUsage(0).tick();
|
||||
|
||||
// Connection should be closed by broker when disk is full and attempt to send
|
||||
Exception e = null;
|
||||
|
@ -190,30 +188,20 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
@Test
|
||||
public void testConnect() throws Exception {
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
|
||||
.addHeader(Stomp.Headers.Connect.LOGIN, defUser)
|
||||
.addHeader(Stomp.Headers.Connect.PASSCODE, defPass)
|
||||
.addHeader(Stomp.Headers.Connect.REQUEST_ID, "1");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT).addHeader(Stomp.Headers.Connect.LOGIN, defUser).addHeader(Stomp.Headers.Connect.PASSCODE, defPass).addHeader(Stomp.Headers.Connect.REQUEST_ID, "1");
|
||||
ClientStompFrame response = conn.sendFrame(frame);
|
||||
|
||||
Assert.assertTrue(response.getCommand()
|
||||
.equals(Stomp.Responses.CONNECTED));
|
||||
Assert.assertTrue(response.getHeader(Stomp.Headers.Connected.RESPONSE_ID)
|
||||
.equals("1"));
|
||||
Assert.assertTrue(response.getCommand().equals(Stomp.Responses.CONNECTED));
|
||||
Assert.assertTrue(response.getHeader(Stomp.Headers.Connected.RESPONSE_ID).equals("1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisconnectAndError() throws Exception {
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
|
||||
.addHeader(Stomp.Headers.Connect.LOGIN, defUser)
|
||||
.addHeader(Stomp.Headers.Connect.PASSCODE, defPass)
|
||||
.addHeader(Stomp.Headers.Connect.REQUEST_ID, "1");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT).addHeader(Stomp.Headers.Connect.LOGIN, defUser).addHeader(Stomp.Headers.Connect.PASSCODE, defPass).addHeader(Stomp.Headers.Connect.REQUEST_ID, "1");
|
||||
ClientStompFrame response = conn.sendFrame(frame);
|
||||
|
||||
Assert.assertTrue(response.getCommand()
|
||||
.equals(Stomp.Responses.CONNECTED));
|
||||
Assert.assertTrue(response.getHeader(Stomp.Headers.Connected.RESPONSE_ID)
|
||||
.equals("1"));
|
||||
Assert.assertTrue(response.getCommand().equals(Stomp.Responses.CONNECTED));
|
||||
Assert.assertTrue(response.getHeader(Stomp.Headers.Connected.RESPONSE_ID).equals("1"));
|
||||
|
||||
conn.disconnect();
|
||||
|
||||
|
@ -285,12 +273,13 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
// Receive STOMP Message
|
||||
ClientStompFrame frame = conn.receiveFrame();
|
||||
assertTrue(frame.getBody()
|
||||
.contains(payload));
|
||||
assertTrue(frame.getBody().contains(payload));
|
||||
|
||||
}
|
||||
|
||||
public void sendMessageToNonExistentQueue(String queuePrefix, String queue, RoutingType routingType) throws Exception {
|
||||
public void sendMessageToNonExistentQueue(String queuePrefix,
|
||||
String queue,
|
||||
RoutingType routingType) throws Exception {
|
||||
conn.connect(defUser, defPass);
|
||||
send(conn, queuePrefix + queue, null, "Hello World", true, routingType);
|
||||
|
||||
|
@ -325,7 +314,9 @@ public class StompTest extends StompTestBase {
|
|||
sendMessageToNonExistentQueue(getQueuePrefix(), nonExistentQueue, null);
|
||||
}
|
||||
|
||||
public void sendMessageToNonExistentTopic(String topicPrefix, String topic, RoutingType routingType) throws Exception {
|
||||
public void sendMessageToNonExistentTopic(String topicPrefix,
|
||||
String topic,
|
||||
RoutingType routingType) throws Exception {
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
// first send a message to ensure that sending to a non-existent topic won't throw an error
|
||||
|
@ -350,8 +341,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
// closing the consumer here should trigger auto-deletion of the subscription queue and address
|
||||
consumer.close();
|
||||
Thread.sleep(200);
|
||||
assertNull(server.getAddressInfo(new SimpleString(topic)));
|
||||
Wait.assertTrue(() -> server.getAddressInfo(new SimpleString(topic)) == null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -379,9 +369,7 @@ public class StompTest extends StompTestBase {
|
|||
@Test
|
||||
public void testSendMessageWithLeadingNewLine() throws Exception {
|
||||
conn.connect(defUser, defPass);
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.setBody("Hello World");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).setBody("Hello World");
|
||||
conn.sendWickedFrame(frame);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
@ -430,10 +418,7 @@ public class StompTest extends StompTestBase {
|
|||
baos.write(data);
|
||||
baos.flush();
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length))
|
||||
.setBody(new String(baos.toByteArray()));
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length)).setBody(new String(baos.toByteArray()));
|
||||
conn.sendFrame(frame);
|
||||
|
||||
BytesMessage message = (BytesMessage) consumer.receive(10000);
|
||||
|
@ -452,10 +437,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader("JMSXGroupID", jmsxGroupID)
|
||||
.setBody("Hello World");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader("JMSXGroupID", jmsxGroupID).setBody("Hello World");
|
||||
conn.sendFrame(frame);
|
||||
|
||||
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||
|
@ -472,11 +454,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader("foo", "abc")
|
||||
.addHeader("bar", "123")
|
||||
.setBody("Hello World");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader("foo", "abc").addHeader("bar", "123").setBody("Hello World");
|
||||
conn.sendFrame(frame);
|
||||
|
||||
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||
|
@ -493,11 +471,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader("foo", "abc")
|
||||
.addHeader("b-ar", "123")
|
||||
.setBody("Hello World");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader("foo", "abc").addHeader("b-ar", "123").setBody("Hello World");
|
||||
conn.sendFrame(frame);
|
||||
|
||||
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||
|
@ -514,16 +488,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader("foo", "abc")
|
||||
.addHeader("bar", "123")
|
||||
.addHeader("correlation-id", "c123")
|
||||
.addHeader("persistent", "true")
|
||||
.addHeader("type", "t345")
|
||||
.addHeader("JMSXGroupID", "abc")
|
||||
.addHeader("priority", "3")
|
||||
.setBody("Hello World");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader("foo", "abc").addHeader("bar", "123").addHeader("correlation-id", "c123").addHeader("persistent", "true").addHeader("type", "t345").addHeader("JMSXGroupID", "abc").addHeader("priority", "3").setBody("Hello World");
|
||||
conn.sendFrame(frame);
|
||||
|
||||
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||
|
@ -552,17 +517,7 @@ public class StompTest extends StompTestBase {
|
|||
buffer.append("a");
|
||||
}
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader("foo", "abc")
|
||||
.addHeader("bar", "123")
|
||||
.addHeader("correlation-id", "c123")
|
||||
.addHeader("persistent", "true")
|
||||
.addHeader("type", "t345")
|
||||
.addHeader("JMSXGroupID", "abc")
|
||||
.addHeader("priority", "3")
|
||||
.addHeader("longHeader", buffer.toString())
|
||||
.setBody("Hello World");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader("foo", "abc").addHeader("bar", "123").addHeader("correlation-id", "c123").addHeader("persistent", "true").addHeader("type", "t345").addHeader("JMSXGroupID", "abc").addHeader("priority", "3").addHeader("longHeader", buffer.toString()).setBody("Hello World");
|
||||
conn.sendFrame(frame);
|
||||
|
||||
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||
|
@ -573,8 +528,7 @@ public class StompTest extends StompTestBase {
|
|||
Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
|
||||
Assert.assertEquals(javax.jms.DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
|
||||
Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
|
||||
Assert.assertEquals("longHeader", 1024, message.getStringProperty("longHeader")
|
||||
.length());
|
||||
Assert.assertEquals("longHeader", 1024, message.getStringProperty("longHeader").length());
|
||||
|
||||
Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
|
||||
}
|
||||
|
@ -585,20 +539,10 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader("foo", "abc")
|
||||
.addHeader("bar", "123")
|
||||
.addHeader("correlation-id", "c123")
|
||||
.addHeader("persistent", "true")
|
||||
.addHeader("type", "t345")
|
||||
.addHeader("JMSXGroupID", "abc")
|
||||
.addHeader("priority", "3")
|
||||
.addHeader("AMQ_SCHEDULED_DELAY", "2000")
|
||||
.setBody("Hello World");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader("foo", "abc").addHeader("bar", "123").addHeader("correlation-id", "c123").addHeader("persistent", "true").addHeader("type", "t345").addHeader("JMSXGroupID", "abc").addHeader("priority", "3").addHeader("AMQ_SCHEDULED_DELAY", "2000").setBody("Hello World");
|
||||
conn.sendFrame(frame);
|
||||
|
||||
assertNull("Should not receive message yet", consumer.receive(1000));
|
||||
assertNull("Should not receive message yet", consumer.receive(100));
|
||||
|
||||
TextMessage message = (TextMessage) consumer.receive(4000);
|
||||
Assert.assertNotNull(message);
|
||||
|
@ -617,20 +561,10 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader("foo", "abc")
|
||||
.addHeader("bar", "123")
|
||||
.addHeader("correlation-id", "c123")
|
||||
.addHeader("persistent", "true")
|
||||
.addHeader("type", "t345")
|
||||
.addHeader("JMSXGroupID", "abc")
|
||||
.addHeader("priority", "3")
|
||||
.addHeader("AMQ_SCHEDULED_TIME", Long.toString(System.currentTimeMillis() + 2000))
|
||||
.setBody("Hello World");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader("foo", "abc").addHeader("bar", "123").addHeader("correlation-id", "c123").addHeader("persistent", "true").addHeader("type", "t345").addHeader("JMSXGroupID", "abc").addHeader("priority", "3").addHeader("AMQ_SCHEDULED_TIME", Long.toString(System.currentTimeMillis() + 2000)).setBody("Hello World");
|
||||
conn.sendFrame(frame);
|
||||
|
||||
assertNull("Should not receive message yet", consumer.receive(1000));
|
||||
assertNull("Should not receive message yet", consumer.receive(100));
|
||||
|
||||
TextMessage message = (TextMessage) consumer.receive(4000);
|
||||
Assert.assertNotNull(message);
|
||||
|
@ -649,13 +583,10 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader("AMQ_SCHEDULED_DELAY", "foo")
|
||||
.setBody("Hello World");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader("AMQ_SCHEDULED_DELAY", "foo").setBody("Hello World");
|
||||
conn.sendFrame(frame);
|
||||
|
||||
assertNull("Should not receive message yet", consumer.receive(1000));
|
||||
assertNull("Should not receive message yet", consumer.receive(100));
|
||||
|
||||
ClientStompFrame error = conn.receiveFrame();
|
||||
|
||||
|
@ -669,13 +600,10 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader("AMQ_SCHEDULED_TIME", "foo")
|
||||
.setBody("Hello World");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader("AMQ_SCHEDULED_TIME", "foo").setBody("Hello World");
|
||||
conn.sendFrame(frame);
|
||||
|
||||
assertNull("Should not receive message yet", consumer.receive(1000));
|
||||
assertNull("Should not receive message yet", consumer.receive(100));
|
||||
|
||||
ClientStompFrame error = conn.receiveFrame();
|
||||
|
||||
|
@ -704,7 +632,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
// message should not be received as it was auto-acked
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(1000);
|
||||
Message message = consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
|
||||
}
|
||||
|
@ -908,7 +836,6 @@ public class StompTest extends StompTestBase {
|
|||
producer.send(ignoredMessage);
|
||||
producer.send(realMessage);
|
||||
|
||||
|
||||
ClientStompFrame frame = conn.receiveFrame(10000);
|
||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
||||
Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
|
||||
|
@ -931,7 +858,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
// message should not be received since message was acknowledged by the client
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(1000);
|
||||
Message message = consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
@ -1008,7 +935,6 @@ public class StompTest extends StompTestBase {
|
|||
Assert.assertEquals("shouldBeNextMessage", frame.getBody());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUnsubscribe() throws Exception {
|
||||
conn.connect(defUser, defPass);
|
||||
|
@ -1027,7 +953,7 @@ public class StompTest extends StompTestBase {
|
|||
// send a message to our queue
|
||||
sendJmsMessage("second message");
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
}
|
||||
|
@ -1050,7 +976,7 @@ public class StompTest extends StompTestBase {
|
|||
// send a message to our queue
|
||||
sendJmsMessage("second message");
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
|
@ -1133,19 +1059,15 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
subscribeTopic(conn, null, null, null, true);
|
||||
|
||||
assertTrue("Subscription queue should be created here", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisfied() throws Exception {
|
||||
int length = server.getActiveMQServerControl().getQueueNames().length;
|
||||
if (length - baselineQueueCount == 1) {
|
||||
return true;
|
||||
} else {
|
||||
log.debug("Queue count: " + (length - baselineQueueCount));
|
||||
return false;
|
||||
}
|
||||
Wait.assertTrue("Subscription queue should be created here", () -> {
|
||||
int length = server.getActiveMQServerControl().getQueueNames().length;
|
||||
if (length - baselineQueueCount == 1) {
|
||||
return true;
|
||||
} else {
|
||||
log.debug("Queue count: " + (length - baselineQueueCount));
|
||||
return false;
|
||||
}
|
||||
}, TimeUnit.SECONDS.toMillis(10), TimeUnit.MILLISECONDS.toMillis(100)));
|
||||
});
|
||||
|
||||
sendJmsMessage(getName(), topic);
|
||||
|
||||
|
@ -1158,7 +1080,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
sendJmsMessage(getName(), topic);
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
|
@ -1174,16 +1096,13 @@ public class StompTest extends StompTestBase {
|
|||
conn.connect(defUser, defPass);
|
||||
subscribe(conn, null, null, null, true);
|
||||
|
||||
assertFalse("Queue should not be created here", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisfied() throws Exception {
|
||||
if (server.getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
Wait.assertFalse("Queue should not be created here", () -> {
|
||||
if (server.getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}, TimeUnit.MILLISECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
|
||||
});
|
||||
|
||||
sendJmsMessage(getName(), queue);
|
||||
|
||||
|
@ -1196,7 +1115,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
sendJmsMessage(getName(), queue);
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
|
@ -1223,15 +1142,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
final Queue subscription = ((LocalQueueBinding) server.getPostOffice().getBinding(new SimpleString(nonExistentQueue))).getQueue();
|
||||
|
||||
assertTrue(Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisfied() throws Exception {
|
||||
if (subscription.getMessageCount() == 0)
|
||||
return true;
|
||||
else
|
||||
return false;
|
||||
}
|
||||
}, 1000, 50));
|
||||
Wait.assertEquals(0, subscription::getMessageCount);
|
||||
|
||||
unsubscribe(conn, null, getQueuePrefix() + nonExistentQueue, true, false);
|
||||
|
||||
|
@ -1239,7 +1150,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
sendJmsMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue));
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
|
@ -1356,22 +1267,15 @@ public class StompTest extends StompTestBase {
|
|||
String subId = UUID.randomUUID().toString();
|
||||
String durableSubName = UUID.randomUUID().toString();
|
||||
String receipt = UUID.randomUUID().toString();
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/test.foo")
|
||||
.addHeader(Stomp.Headers.Unsubscribe.ID, subId)
|
||||
.addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)
|
||||
.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableSubName)
|
||||
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE).addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/test.foo").addHeader(Stomp.Headers.Unsubscribe.ID, subId).addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL).addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableSubName).addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);
|
||||
|
||||
frame = conn.sendFrame(frame);
|
||||
assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
|
||||
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100));
|
||||
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null);
|
||||
|
||||
receipt = UUID.randomUUID().toString();
|
||||
frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Unsubscribe.ID, subId)
|
||||
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);
|
||||
frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE).addHeader(Stomp.Headers.Unsubscribe.ID, subId).addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);
|
||||
|
||||
frame = conn.sendFrame(frame);
|
||||
assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
|
||||
|
@ -1379,7 +1283,7 @@ public class StompTest extends StompTestBase {
|
|||
conn.disconnect();
|
||||
|
||||
// make sure the durable subscription queue is still there
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100));
|
||||
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1410,7 +1314,7 @@ public class StompTest extends StompTestBase {
|
|||
// send a message on the same connection => it should not be received is noLocal = true on subscribe
|
||||
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
||||
|
||||
ClientStompFrame frame = conn.receiveFrame(2000);
|
||||
ClientStompFrame frame = conn.receiveFrame(100);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
|
@ -1445,7 +1349,7 @@ public class StompTest extends StompTestBase {
|
|||
assertEquals(Stomp.Responses.RECEIPT, response.getCommand());
|
||||
|
||||
// ...and nothing else
|
||||
ClientStompFrame frame = conn.receiveFrame(2000);
|
||||
ClientStompFrame frame = conn.receiveFrame(100);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull(frame);
|
||||
|
||||
|
@ -1470,7 +1374,7 @@ public class StompTest extends StompTestBase {
|
|||
ack(conn, null, messageID, "tx1");
|
||||
abortTransaction(conn, "tx1");
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
Assert.assertNull("No message should have been received as the message was acked even though the transaction has been aborted", frame);
|
||||
|
||||
unsubscribe(conn, null, getQueuePrefix() + getQueueName(), false, false);
|
||||
|
@ -1482,6 +1386,8 @@ public class StompTest extends StompTestBase {
|
|||
@Test
|
||||
public void testMultiProtocolConsumers() throws Exception {
|
||||
final int TIME_OUT = 2000;
|
||||
// a timeout for when we expect negative results (like receive==null)
|
||||
final int NEGATIVE_TIME_OUT = 100;
|
||||
|
||||
int count = 1000;
|
||||
|
||||
|
@ -1512,7 +1418,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
sendJmsMessage(getName(), topic);
|
||||
|
||||
ClientStompFrame frame = conn.receiveFrame(TIME_OUT);
|
||||
ClientStompFrame frame = conn.receiveFrame(NEGATIVE_TIME_OUT);
|
||||
log.debug("Received frame: " + frame);
|
||||
Assert.assertNull("No message should have been received since subscription was removed", frame);
|
||||
|
||||
|
@ -1593,9 +1499,7 @@ public class StompTest extends StompTestBase {
|
|||
} else {
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, PREFIXED_ADDRESS)
|
||||
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE).addHeader(Stomp.Headers.Subscribe.DESTINATION, PREFIXED_ADDRESS).addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
|
||||
frame = conn.sendFrame(frame);
|
||||
|
||||
|
@ -1615,6 +1519,7 @@ public class StompTest extends StompTestBase {
|
|||
/**
|
||||
* This test and testPrefixedAutoCreatedMulticastAndAnycastWithSameName are basically the same but doing the
|
||||
* operations in opposite order. In this test the anycast subscription is created first.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
|
@ -1630,9 +1535,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
// since this queue doesn't exist the broker should create a new ANYCAST address & queue
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS)
|
||||
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE).addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS).addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
frame = conn.sendFrame(frame);
|
||||
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
|
||||
|
||||
|
@ -1650,7 +1553,7 @@ public class StompTest extends StompTestBase {
|
|||
assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
|
||||
|
||||
// however, no message should be routed to the ANYCAST queue
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
Assert.assertNull(frame);
|
||||
|
||||
// sending a message to the ANYCAST queue, should be received
|
||||
|
@ -1661,7 +1564,7 @@ public class StompTest extends StompTestBase {
|
|||
Assert.assertEquals("Hello World 2", frame.getBody());
|
||||
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
||||
Assert.assertEquals("/queue/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
Assert.assertNull(frame);
|
||||
|
||||
unsubscribe(conn, null, "/queue/" + ADDRESS, true, false);
|
||||
|
@ -1670,9 +1573,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
// now subscribe to the address in a MULTICAST way which will create a MULTICAST queue for the subscription
|
||||
uuid = UUID.randomUUID().toString();
|
||||
frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS)
|
||||
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
frame = conn.createFrame(Stomp.Commands.SUBSCRIBE).addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS).addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
frame = conn.sendFrame(frame);
|
||||
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
|
||||
|
||||
|
@ -1686,7 +1587,7 @@ public class StompTest extends StompTestBase {
|
|||
Assert.assertEquals("Hello World 3", frame.getBody());
|
||||
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
||||
Assert.assertEquals("/topic/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
Assert.assertNull(frame);
|
||||
|
||||
unsubscribe(conn, null, "/topic/" + ADDRESS, true, false);
|
||||
|
@ -1697,6 +1598,7 @@ public class StompTest extends StompTestBase {
|
|||
/**
|
||||
* This test and testPrefixedAutoCreatedMulticastAndAnycastWithSameName are basically the same but doing the
|
||||
* operations in opposite order. In this test the multicast subscription is created first.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
|
@ -1712,9 +1614,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
// since this queue doesn't exist the broker should create a new MULTICAST address
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS)
|
||||
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE).addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS).addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
frame = conn.sendFrame(frame);
|
||||
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
|
||||
|
||||
|
@ -1732,7 +1632,7 @@ public class StompTest extends StompTestBase {
|
|||
assertNotNull(server.locateQueue(SimpleString.toSimpleString(ADDRESS)));
|
||||
|
||||
// however, no message should be routed to the MULTICAST queue
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
Assert.assertNull(frame);
|
||||
|
||||
// sending a message to the MULTICAST queue, should be received
|
||||
|
@ -1743,7 +1643,7 @@ public class StompTest extends StompTestBase {
|
|||
Assert.assertEquals("Hello World 2", frame.getBody());
|
||||
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
||||
Assert.assertEquals("/topic/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
Assert.assertNull(frame);
|
||||
|
||||
frame = unsubscribe(conn, null, "/topic/" + ADDRESS, true, false);
|
||||
|
@ -1751,9 +1651,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
// now subscribe to the address in an ANYCAST way
|
||||
uuid = UUID.randomUUID().toString();
|
||||
frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS)
|
||||
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
frame = conn.createFrame(Stomp.Commands.SUBSCRIBE).addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS).addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
frame = conn.sendFrame(frame);
|
||||
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
|
||||
|
||||
|
@ -1763,7 +1661,7 @@ public class StompTest extends StompTestBase {
|
|||
Assert.assertEquals("Hello World 1", frame.getBody());
|
||||
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
||||
Assert.assertEquals("/queue/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||
frame = conn.receiveFrame(2000);
|
||||
frame = conn.receiveFrame(100);
|
||||
Assert.assertNull(frame);
|
||||
|
||||
unsubscribe(conn, null, "/queue/" + ADDRESS, true, false);
|
||||
|
@ -1804,9 +1702,7 @@ public class StompTest extends StompTestBase {
|
|||
conn.connect(defUser, defPass);
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, PREFIXED_ADDRESS)
|
||||
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE).addHeader(Stomp.Headers.Subscribe.DESTINATION, PREFIXED_ADDRESS).addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
|
||||
frame = conn.sendFrame(frame);
|
||||
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
|
||||
|
@ -1839,20 +1735,14 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType)
|
||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, destination)
|
||||
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE).addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType).addHeader(Stomp.Headers.Subscribe.DESTINATION, destination).addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
|
||||
frame = conn.sendFrame(frame);
|
||||
assertEquals(Stomp.Responses.ERROR, frame.getCommand());
|
||||
|
||||
uuid = UUID.randomUUID().toString();
|
||||
|
||||
frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, RoutingType.MULTICAST.toString())
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION_TYPE, RoutingType.MULTICAST.toString()).addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()).addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
|
||||
|
||||
frame = conn.sendFrame(frame);
|
||||
assertEquals(Stomp.Responses.ERROR, frame.getCommand());
|
||||
|
@ -1865,11 +1755,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
subscribe(conn, null);
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, ActiveMQDefaultConfiguration.getDefaultManagementAddress().toString())
|
||||
.addHeader(Stomp.Headers.Send.REPLY_TO, getQueuePrefix() + getQueueName())
|
||||
.addHeader(ManagementHelper.HDR_RESOURCE_NAME.toString(), ResourceNames.QUEUE + getQueuePrefix() + getQueueName())
|
||||
.addHeader(ManagementHelper.HDR_ATTRIBUTE.toString(), "Address");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, ActiveMQDefaultConfiguration.getDefaultManagementAddress().toString()).addHeader(Stomp.Headers.Send.REPLY_TO, getQueuePrefix() + getQueueName()).addHeader(ManagementHelper.HDR_RESOURCE_NAME.toString(), ResourceNames.QUEUE + getQueuePrefix() + getQueueName()).addHeader(ManagementHelper.HDR_ATTRIBUTE.toString(), "Address");
|
||||
|
||||
conn.sendFrame(frame);
|
||||
|
||||
|
@ -1892,12 +1778,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
subscribe(conn, null);
|
||||
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||
.addHeader(Stomp.Headers.Send.DESTINATION, ActiveMQDefaultConfiguration.getDefaultManagementAddress().toString())
|
||||
.addHeader(Stomp.Headers.Send.REPLY_TO, getQueuePrefix() + getQueueName())
|
||||
.addHeader(ManagementHelper.HDR_RESOURCE_NAME.toString(), ResourceNames.QUEUE + getQueuePrefix() + getQueueName())
|
||||
.addHeader(ManagementHelper.HDR_OPERATION_NAME.toString(), "countMessages")
|
||||
.setBody("[\"color = 'blue'\"]");
|
||||
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, ActiveMQDefaultConfiguration.getDefaultManagementAddress().toString()).addHeader(Stomp.Headers.Send.REPLY_TO, getQueuePrefix() + getQueueName()).addHeader(ManagementHelper.HDR_RESOURCE_NAME.toString(), ResourceNames.QUEUE + getQueuePrefix() + getQueueName()).addHeader(ManagementHelper.HDR_OPERATION_NAME.toString(), "countMessages").setBody("[\"color = 'blue'\"]");
|
||||
|
||||
conn.sendFrame(frame);
|
||||
|
||||
|
@ -1931,8 +1812,8 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
send(conn, addressA, null, "Hello World!", true, RoutingType.ANYCAST);
|
||||
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1, 2000, 100));
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() == 0, 2000, 100));
|
||||
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1);
|
||||
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() == 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1952,8 +1833,8 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
send(conn, addressA, null, "Hello World!", true, RoutingType.MULTICAST);
|
||||
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() == 0, 2000, 100));
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 2, 2000, 100));
|
||||
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() == 0);
|
||||
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1975,8 +1856,8 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
send(conn, addressA, null, "Hello World!", true);
|
||||
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1, 2000, 100));
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount() == 2, 2000, 100));
|
||||
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1);
|
||||
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount() == 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1989,14 +1870,11 @@ public class StompTest extends StompTestBase {
|
|||
Assert.assertNull(server.getAddressInfo(simpleQueueName));
|
||||
Assert.assertNull(server.locateQueue(simpleQueueName));
|
||||
|
||||
server.getAddressSettingsRepository().addMatch(queueName, new AddressSettings()
|
||||
.setDefaultAddressRoutingType(RoutingType.ANYCAST)
|
||||
.setDefaultQueueRoutingType(RoutingType.ANYCAST)
|
||||
);
|
||||
server.getAddressSettingsRepository().addMatch(queueName, new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST).setDefaultQueueRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
send(conn, queueName, null, "Hello ANYCAST");
|
||||
|
||||
assertTrue("Address and queue should be created now", Wait.waitFor(() -> (server.getAddressInfo(simpleQueueName) != null) && (server.locateQueue(simpleQueueName) != null), 2000, 200));
|
||||
Wait.assertTrue("Address and queue should be created now", () -> (server.getAddressInfo(simpleQueueName) != null) && (server.locateQueue(simpleQueueName) != null));
|
||||
assertTrue(server.getAddressInfo(simpleQueueName).getRoutingTypes().contains(RoutingType.ANYCAST));
|
||||
assertEquals(RoutingType.ANYCAST, server.locateQueue(simpleQueueName).getRoutingType());
|
||||
}
|
||||
|
@ -2013,13 +1891,11 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
send(conn, queueName, null, "Hello MULTICAST");
|
||||
|
||||
assertTrue("Address should be created now", Wait.waitFor(() -> (server.getAddressInfo(simpleQueueName) != null), 2000, 200));
|
||||
Wait.assertTrue("Address should be created now", () -> (server.getAddressInfo(simpleQueueName) != null));
|
||||
assertTrue(server.getAddressInfo(simpleQueueName).getRoutingTypes().contains(RoutingType.MULTICAST));
|
||||
Assert.assertNull(server.locateQueue(simpleQueueName));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void directDeliverDisabledOnStomp() throws Exception {
|
||||
String payload = "This is a test message";
|
||||
|
@ -2029,7 +1905,7 @@ public class StompTest extends StompTestBase {
|
|||
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||
|
||||
for (Binding b : iterableOf(server.getPostOffice().getAllBindings().filter(QueueBinding.class::isInstance))) {
|
||||
Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
|
||||
Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding) b).getQueue().isDirectDeliver());
|
||||
}
|
||||
|
||||
// Send MQTT Message
|
||||
|
@ -2040,8 +1916,7 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
// Receive STOMP Message
|
||||
ClientStompFrame frame = conn.receiveFrame();
|
||||
assertTrue(frame.getBody()
|
||||
.contains(payload));
|
||||
assertTrue(frame.getBody().contains(payload));
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ public class StompWithMessageIDTest extends StompTestBase {
|
|||
message = (TextMessage) consumer.receive(1000);
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
message = (TextMessage) consumer.receive(2000);
|
||||
message = (TextMessage) consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
|
||||
conn.disconnect();
|
||||
|
|
|
@ -961,7 +961,7 @@ public class StompV11Test extends StompTestBase {
|
|||
|
||||
//Nack makes the message be dropped.
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(1000);
|
||||
Message message = consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
@ -1041,7 +1041,7 @@ public class StompV11Test extends StompTestBase {
|
|||
|
||||
//Nack makes the message be dropped.
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(1000);
|
||||
Message message = consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
@ -1201,7 +1201,7 @@ public class StompV11Test extends StompTestBase {
|
|||
|
||||
//no messages can be received.
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(1000);
|
||||
Message message = consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
@ -1238,7 +1238,7 @@ public class StompV11Test extends StompTestBase {
|
|||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(10000);
|
||||
Assert.assertNotNull(message);
|
||||
message = consumer.receive(1000);
|
||||
message = consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
@ -1267,7 +1267,7 @@ public class StompV11Test extends StompTestBase {
|
|||
|
||||
//no messages can be received.
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(1000);
|
||||
Message message = consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
@ -1310,7 +1310,7 @@ public class StompV11Test extends StompTestBase {
|
|||
instanceLog.debug("Legal: " + message.getText());
|
||||
}
|
||||
|
||||
message = (TextMessage) consumer.receive(1000);
|
||||
message = (TextMessage) consumer.receive(100);
|
||||
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
@ -1425,7 +1425,7 @@ public class StompV11Test extends StompTestBase {
|
|||
|
||||
abortTransaction(conn, "tx1");
|
||||
|
||||
frame = conn.receiveFrame(500);
|
||||
frame = conn.receiveFrame(100);
|
||||
|
||||
assertNull(frame);
|
||||
|
||||
|
@ -1566,8 +1566,6 @@ public class StompV11Test extends StompTestBase {
|
|||
|
||||
Wait.assertTrue(() -> server.locateQueue(queueName) == null);
|
||||
|
||||
assertNull(server.locateQueue(queueName));
|
||||
|
||||
conn.disconnect();
|
||||
}
|
||||
|
||||
|
@ -1776,7 +1774,7 @@ public class StompV11Test extends StompTestBase {
|
|||
long tmsg = message.getJMSTimestamp();
|
||||
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
|
||||
|
||||
assertNull(consumer.receive(1000));
|
||||
assertNull(consumer.receive(100));
|
||||
|
||||
conn.disconnect();
|
||||
}
|
||||
|
@ -1894,7 +1892,7 @@ public class StompV11Test extends StompTestBase {
|
|||
|
||||
sendJmsMessage(getName(), topic);
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
assertNull(frame);
|
||||
|
||||
conn.disconnect();
|
||||
|
@ -1908,7 +1906,7 @@ public class StompV11Test extends StompTestBase {
|
|||
|
||||
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
||||
|
||||
ClientStompFrame frame = conn.receiveFrame(2000);
|
||||
ClientStompFrame frame = conn.receiveFrame(100);
|
||||
|
||||
assertNull(frame);
|
||||
|
||||
|
@ -1944,7 +1942,7 @@ public class StompV11Test extends StompTestBase {
|
|||
|
||||
// message should not be received as it was auto-acked
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(1000);
|
||||
Message message = consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
@ -1990,7 +1988,7 @@ public class StompV11Test extends StompTestBase {
|
|||
|
||||
// message should not be received since message was acknowledged by the client
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(1000);
|
||||
Message message = consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
@ -2179,7 +2177,7 @@ public class StompV11Test extends StompTestBase {
|
|||
// send a message to our queue
|
||||
sendJmsMessage("second message");
|
||||
|
||||
frame = conn.receiveFrame(1000);
|
||||
frame = conn.receiveFrame(100);
|
||||
assertNull(frame);
|
||||
|
||||
conn.disconnect();
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConne
|
|||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11;
|
||||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -1592,13 +1593,8 @@ public class StompV12Test extends StompTestBase {
|
|||
|
||||
unsubscribe(conn, getName(), null, false, true);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
|
||||
while (server.locateQueue(queueName) != null && (System.currentTimeMillis() - start) < 5000) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
assertNull(server.locateQueue(queueName));
|
||||
Wait.assertTrue(() -> server.locateQueue(queueName) == null);
|
||||
|
||||
conn.disconnect();
|
||||
}
|
||||
|
@ -1946,7 +1942,7 @@ public class StompV12Test extends StompTestBase {
|
|||
// send a message on the same connection => it should not be received
|
||||
send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
|
||||
|
||||
ClientStompFrame frame = conn.receiveFrame(2000);
|
||||
ClientStompFrame frame = conn.receiveFrame(100);
|
||||
|
||||
Assert.assertNull(frame);
|
||||
|
||||
|
@ -2391,6 +2387,8 @@ public class StompV12Test extends StompTestBase {
|
|||
|
||||
private void internalSubscribeWithZeroConsumerWindowSize(String consumerWindowSizeHeader, boolean ack) throws Exception {
|
||||
final int TIMEOUT = 1000;
|
||||
// to be used when we expect it to be null
|
||||
final int NEGATIVE_TIMEOUT = 100;
|
||||
conn.connect(defUser, defPass);
|
||||
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL, null, null, getQueuePrefix() + getQueueName(), true, 0, consumerWindowSizeHeader);
|
||||
|
||||
|
@ -2400,7 +2398,7 @@ public class StompV12Test extends StompTestBase {
|
|||
Assert.assertNotNull(frame1);
|
||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
|
||||
String messageID = frame1.getHeader(Stomp.Headers.Message.MESSAGE_ID);
|
||||
ClientStompFrame frame2 = conn.receiveFrame(TIMEOUT);
|
||||
ClientStompFrame frame2 = conn.receiveFrame(NEGATIVE_TIMEOUT);
|
||||
Assert.assertNull(frame2);
|
||||
if (ack) {
|
||||
ack(conn, messageID);
|
||||
|
@ -2421,7 +2419,7 @@ public class StompV12Test extends StompTestBase {
|
|||
conn.disconnect();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(TIMEOUT);
|
||||
Message message = consumer.receive(100);
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
@ -2465,7 +2463,7 @@ public class StompV12Test extends StompTestBase {
|
|||
Assert.assertNotNull(frame2);
|
||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
|
||||
String messageID2 = frame2.getHeader(Stomp.Headers.Message.MESSAGE_ID);
|
||||
ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
|
||||
ClientStompFrame frame3 = conn.receiveFrame(100);
|
||||
Assert.assertNull(frame3);
|
||||
if (ack) {
|
||||
ack(conn, messageID1);
|
||||
|
@ -2488,7 +2486,7 @@ public class StompV12Test extends StompTestBase {
|
|||
conn.disconnect();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(TIMEOUT);
|
||||
Message message = consumer.receiveNoWait();
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
@ -2515,7 +2513,7 @@ public class StompV12Test extends StompTestBase {
|
|||
Assert.assertNotNull(frame2);
|
||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
|
||||
String messageID2 = frame2.getHeader(Stomp.Headers.Message.MESSAGE_ID);
|
||||
ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
|
||||
ClientStompFrame frame3 = conn.receiveFrame(100);
|
||||
Assert.assertNull(frame3);
|
||||
// this should clear the first 2 messages since we're using CLIENT ack mode
|
||||
ack(conn, messageID2);
|
||||
|
@ -2533,7 +2531,7 @@ public class StompV12Test extends StompTestBase {
|
|||
conn.disconnect();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(TIMEOUT);
|
||||
Message message = consumer.receiveNoWait();
|
||||
Assert.assertNull(message);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue