mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-08 02:59:14 +00:00
ARTEMIS-2238 Fixing QueueQuery on every single send on topics
This commit is contained in:
parent
f76a4a9afb
commit
90a66266b6
@ -423,16 +423,18 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
|
|||||||
throw new InvalidDestinationException("Destination " + address + " does not exist");
|
throw new InvalidDestinationException("Destination " + address + " does not exist");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
|
if (destination.isQueue()) {
|
||||||
if (queueQuery.isExists()) {
|
ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
|
||||||
connection.addKnownDestination(address);
|
if (!queueQuery.isExists()) {
|
||||||
} else if (destination.isQueue() && query.isAutoCreateQueues()) {
|
if (destination.isTemporary()) {
|
||||||
if (destination.isTemporary()) {
|
session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, query);
|
||||||
session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, query);
|
} else {
|
||||||
} else {
|
session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query);
|
||||||
session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query);
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connection.addKnownDestination(address);
|
||||||
}
|
}
|
||||||
} catch (ActiveMQQueueExistsException e) {
|
} catch (ActiveMQQueueExistsException e) {
|
||||||
// The queue was created by another client/admin between the query check and send create queue packet
|
// The queue was created by another client/admin between the query check and send create queue packet
|
||||||
|
@ -24,7 +24,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
|
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -32,6 +34,7 @@ import org.junit.Before;
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
@ -39,6 +42,7 @@ import javax.jms.MessageProducer;
|
|||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
import javax.jms.Topic;
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
@ -58,7 +62,7 @@ public class QueueAutoCreationTest extends JMSClientTestSupport {
|
|||||||
String randomSuffix = new BigInteger(130, random).toString(32);
|
String randomSuffix = new BigInteger(130, random).toString(32);
|
||||||
testConn = (ActiveMQConnection)createCoreConnection();
|
testConn = (ActiveMQConnection)createCoreConnection();
|
||||||
clientSession = testConn.getSessionFactory().createSession();
|
clientSession = testConn.getSessionFactory().createSession();
|
||||||
queue1 = createQueue("queue1_" + randomSuffix);
|
queue1 = createAddressOnlyAndFakeQueue("queue1_" + randomSuffix);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -89,7 +93,7 @@ public class QueueAutoCreationTest extends JMSClientTestSupport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected Queue createQueue(final String queueName) throws Exception {
|
protected Queue createAddressOnlyAndFakeQueue(final String queueName) throws Exception {
|
||||||
SimpleString address = SimpleString.toSimpleString(queueName);
|
SimpleString address = SimpleString.toSimpleString(queueName);
|
||||||
clientSession.createAddress(address, RoutingType.ANYCAST, false);
|
clientSession.createAddress(address, RoutingType.ANYCAST, false);
|
||||||
return new ActiveMQQueue(queueName);
|
return new ActiveMQQueue(queueName);
|
||||||
@ -109,6 +113,26 @@ public class QueueAutoCreationTest extends JMSClientTestSupport {
|
|||||||
sendStringOfSize(1024 * 1024, true);
|
sendStringOfSize(1024 * 1024, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
// QueueAutoCreationTest was created to validate auto-creation of queues
|
||||||
|
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
|
||||||
|
public void testAutoCreateOnTopic() throws Exception {
|
||||||
|
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:5672");
|
||||||
|
Connection connection = factory.createConnection();
|
||||||
|
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
|
||||||
|
System.out.println("Address is " + addressName);
|
||||||
|
clientSession.createAddress(addressName, RoutingType.ANYCAST, false);
|
||||||
|
Topic topic = new ActiveMQTopic(addressName.toString());
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = session.createProducer(topic);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
producer.send(session.createTextMessage("hello"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(((ActiveMQConnection)connection).containsKnownDestination(addressName));
|
||||||
|
}
|
||||||
|
|
||||||
private void sendStringOfSize(int msgSize, boolean useCoreReceive) throws JMSException {
|
private void sendStringOfSize(int msgSize, boolean useCoreReceive) throws JMSException {
|
||||||
|
|
||||||
Connection conn = this.createConnection();
|
Connection conn = this.createConnection();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user