Fix several hanging Http transport tests

This commit is contained in:
Christopher L. Shannon (cshannon) 2023-08-16 16:25:26 -04:00 committed by Matt Pavlovich
parent 5243a4451c
commit 0cb5b9bb72
4 changed files with 62 additions and 52 deletions

View File

@ -68,12 +68,13 @@ public class HttpMaxFrameSizeTest {
private void send(int size) throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("http://localhost:8888");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue("test"));
String payload = StringUtils.repeat("*", size);
TextMessage textMessage = session.createTextMessage(payload);
producer.send(textMessage);
try(Connection connection = connectionFactory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue("test"));
String payload = StringUtils.repeat("*", size);
TextMessage textMessage = session.createTextMessage(payload);
producer.send(textMessage);
}
}
}

View File

@ -202,6 +202,7 @@ public class HttpPullConsumerTest {
}
protected void stopBroker() throws Exception {
connection.close();
if (broker != null) {
broker.stop();
broker.waitUntilStopped();

View File

@ -289,6 +289,9 @@ public class HttpSendCompressedMessagesTest {
@After
public void shutDown() throws Exception {
tcpConnection.close();
httpConnection.close();
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
@ -306,12 +309,13 @@ public class HttpSendCompressedMessagesTest {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage(builder.toString()));
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage(builder.toString()));
}
}
private void sendBytesMessage(boolean compressed) throws Exception {
@ -325,14 +329,15 @@ public class HttpSendCompressedMessagesTest {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
BytesMessage message = session.createBytesMessage();
message.writeUTF(builder.toString());
producer.send(message);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
BytesMessage message = session.createBytesMessage();
message.writeUTF(builder.toString());
producer.send(message);
}
}
private void sendStreamMessage(boolean compressed) throws Exception {
@ -346,14 +351,15 @@ public class HttpSendCompressedMessagesTest {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
StreamMessage message = session.createStreamMessage();
message.writeString(builder.toString());
producer.send(message);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
StreamMessage message = session.createStreamMessage();
message.writeString(builder.toString());
producer.send(message);
}
}
private void sendMapMessage(boolean compressed) throws Exception {
@ -367,13 +373,14 @@ public class HttpSendCompressedMessagesTest {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
MapMessage message = session.createMapMessage();
message.setString("content", builder.toString());
producer.send(message);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
MapMessage message = session.createMapMessage();
message.setString("content", builder.toString());
producer.send(message);
}
}
}

View File

@ -68,26 +68,27 @@ public class HttpTransportConnectTimeoutTest {
public void testSendReceiveAfterPause() throws Exception {
final CountDownLatch failed = new CountDownLatch(1);
Connection connection = factory.createConnection();
connection.start();
connection.setExceptionListener(new ExceptionListener() {
try (Connection connection = factory.createConnection()) {
connection.start();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.info("Connection failed due to: {}", exception.getMessage());
failed.countDown();
}
});
@Override
public void onException(JMSException exception) {
LOG.info("Connection failed due to: {}", exception.getMessage());
failed.countDown();
}
});
assertFalse(failed.await(3, TimeUnit.SECONDS));
assertFalse(failed.await(3, TimeUnit.SECONDS));
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
producer.send(session.createMessage());
producer.send(session.createMessage());
assertNotNull(consumer.receive(5000));
assertNotNull(consumer.receive(5000));
}
}
}