This commit is contained in:
Clebert Suconic 2019-11-12 10:24:05 -05:00
commit 7168cc195b
3 changed files with 78 additions and 2 deletions

View File

@ -1249,6 +1249,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
scaledDownNodeIDs.clear();
connectedClientIds.clear();
for (ActiveMQComponent externalComponent : externalComponents) {
try {
if (externalComponent instanceof ServiceComponent) {

View File

@ -75,7 +75,7 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return "";
}
protected URI getBrokerQpidJMSConnectionURI() {
protected String getBrokerQpidJMSConnectionString() {
try {
int port = AMQP_PORT;
@ -100,7 +100,23 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
uri = uri + "?" + getJmsConnectionURIOptions();
}
return new URI(uri);
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected URI getBrokerQpidJMSConnectionURI() {
try {
return new URI(getBrokerQpidJMSConnectionString());
} catch (Exception e) {
throw new RuntimeException();
}
}
protected URI getBrokerQpidJMSFailoverConnectionURI() {
try {
return new URI("failover:(" + getBrokerQpidJMSConnectionString() + ")");
} catch (Exception e) {
throw new RuntimeException();
}
@ -110,6 +126,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return createConnection(getBrokerQpidJMSConnectionURI(), null, null, null, true);
}
protected Connection createFailoverConnection() throws JMSException {
return createConnection(getBrokerQpidJMSFailoverConnectionURI(), null, null, null, true);
}
protected Connection createConnection(boolean start) throws JMSException {
return createConnection(getBrokerQpidJMSConnectionURI(), null, null, null, start);
}

View File

@ -866,4 +866,58 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
assertFalse(failedToSubscribe.get());
}
@Test(timeout = 30000)
public void testBrokerRestartAMQPProducerAMQPConsumer() throws Exception {
Connection connection = createFailoverConnection(); //AMQP
Connection connection2 = createFailoverConnection(); //AMQP
testBrokerRestart(connection, connection2);
}
private void testBrokerRestart(Connection connection1, Connection connection2) throws Exception {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
final MessageConsumer consumer2 = session2.createConsumer(queue2);
MessageProducer producer = session1.createProducer(queue1);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message received = consumer2.receive(100);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode());
server.stop();
Wait.waitFor(() -> !server.isStarted(), 1000);
server.start();
TextMessage message2 = session1.createTextMessage();
message2.setText("hello");
producer.send(message2);
Message received2 = consumer2.receive(100);
assertNotNull("Should have received a message by now.", received2);
assertTrue("Should be an instance of TextMessage", received2 instanceof TextMessage);
assertEquals(DeliveryMode.PERSISTENT, received2.getJMSDeliveryMode());
} finally {
connection1.close();
connection2.close();
}
}
}