ARTEMIS-1818 re-create auto-created queue on JMS reconnect
This commit is contained in:
parent
9b13f5c5bd
commit
d6adc2950a
|
@ -766,11 +766,10 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
boolean isSessionStarted) throws ActiveMQException {
|
boolean isSessionStarted) throws ActiveMQException {
|
||||||
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
|
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
|
||||||
|
|
||||||
// We try and recreate any non durable queues, since they probably won't be there unless
|
// We try to recreate any non-durable or auto-created queues, since they might not be there on failover/reconnect.
|
||||||
// they are defined in broker.xml
|
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover/reconnection
|
||||||
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
|
if (!queueInfo.isDurable() || queueInfo.isAutoCreated()) {
|
||||||
if (!queueInfo.isDurable()) {
|
CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isLastValue());
|
||||||
CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isLastValue());
|
|
||||||
|
|
||||||
sendPacketWithoutLock(sessionChannel, createQueueRequest);
|
sendPacketWithoutLock(sessionChannel, createQueueRequest);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||||
import org.apache.activemq.artemis.core.security.Role;
|
import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
|
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
|
||||||
|
import org.apache.activemq.artemis.junit.Wait;
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||||
|
@ -242,6 +243,28 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
|
||||||
assertNull(server.locateQueue(topicAddress));
|
assertNull(server.locateQueue(topicAddress));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoCreateOnReconnect() throws Exception {
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
javax.jms.Queue queue = ActiveMQJMSClient.createQueue(QUEUE_NAME);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
producer.send(session.createMessage());
|
||||||
|
assertNotNull(consumer.receive(500));
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
waitForServerToStart(server);
|
||||||
|
// wait for client to reconnect
|
||||||
|
assertTrue(Wait.waitFor(() -> server.getTotalConsumerCount() == 1, 3000, 100));
|
||||||
|
producer.send(session.createMessage());
|
||||||
|
assertNotNull(consumer.receive(500));
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
@Override
|
@Override
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue