This commit is contained in:
Clebert Suconic 2019-08-20 10:57:24 -04:00
commit 6fc11338e6
6 changed files with 84 additions and 24 deletions

View File

@ -97,8 +97,6 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
private final Set<SimpleString> tempQueues = new ConcurrentHashSet<>();
private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
private volatile boolean hasNoLocal;
private volatile ExceptionListener exceptionListener;
@ -543,20 +541,10 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
public void addTemporaryQueue(final SimpleString queueAddress) {
tempQueues.add(queueAddress);
knownDestinations.add(queueAddress);
}
public void removeTemporaryQueue(final SimpleString queueAddress) {
tempQueues.remove(queueAddress);
knownDestinations.remove(queueAddress);
}
public void addKnownDestination(final SimpleString address) {
knownDestinations.add(address);
}
public boolean containsKnownDestination(final SimpleString address) {
return knownDestinations.contains(address);
}
public boolean containsTemporaryQueue(final SimpleString queueAddress) {

View File

@ -352,6 +352,7 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
private final transient ActiveMQSession session;
private transient boolean created;
// Constructors --------------------------------------------------
protected ActiveMQDestination(final String address,
@ -449,6 +450,7 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
} else {
sessionToUse.deleteTemporaryTopic(this);
}
setCreated(false);
} finally {
if (openedHere) {
sessionToUse.close();
@ -483,6 +485,14 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
return temporary;
}
public boolean isCreated() {
return created;
}
public void setCreated(boolean created) {
this.created = created;
}
public TYPE getType() {
if (thetype == null) {
if (temporary) {

View File

@ -383,7 +383,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
void checkDestination(ActiveMQDestination destination) throws JMSException {
SimpleString address = destination.getSimpleAddress();
// TODO: What to do with FQQN
if (!connection.containsKnownDestination(address)) {
if (!destination.isCreated()) {
try {
ClientSession.AddressQuery addressQuery = session.addressQuery(address);
@ -425,7 +425,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
// this is done at the end, if no exceptions are thrown
connection.addKnownDestination(address);
destination.setCreated(true);
}
}
@ -811,7 +811,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
}
connection.addKnownDestination(dest.getSimpleAddress());
dest.setCreated(true);
consumer = createClientConsumer(dest, null, coreFilterString);
} else {
@ -825,7 +825,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
}
connection.addKnownDestination(dest.getSimpleAddress());
dest.setCreated(true);
SimpleString queueName;
@ -999,6 +999,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
connection.addTemporaryQueue(simpleAddress);
queue.setCreated(true);
return queue;
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -130,7 +131,63 @@ public class QueueAutoCreationTest extends JMSClientTestSupport {
producer.send(session.createTextMessage("hello"));
}
Assert.assertTrue(((ActiveMQConnection)connection).containsKnownDestination(addressName));
Assert.assertTrue(((ActiveMQDestination) topic).isCreated());
}
@Test(timeout = 30000)
// QueueAutoCreationTest was created to validate auto-creation of queues
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
public void testAutoCreateOnTopicManySends() throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:5672");
Connection connection = factory.createConnection();
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
System.out.println("Address is " + addressName);
clientSession.createAddress(addressName, RoutingType.ANYCAST, false);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 10; i++) {
Topic topic = new ActiveMQTopic(addressName.toString());
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("hello"));
producer.close();
Assert.assertTrue(((ActiveMQDestination) topic).isCreated());
}
}
@Test(timeout = 30000)
// QueueAutoCreationTest was created to validate auto-creation of queues
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
public void testAutoCreateOnTopicAndConsume() throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:5672");
Connection connection = factory.createConnection();
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
System.out.println("Address is " + addressName);
clientSession.createAddress(addressName, RoutingType.ANYCAST, false);
Connection recConnection = factory.createConnection();
Session recSession = recConnection.createSession(Session.AUTO_ACKNOWLEDGE);
Topic topicConsumer = recSession.createTopic(addressName.toString());
MessageConsumer consumer = recSession.createConsumer(topicConsumer);
recConnection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 10; i++) {
Topic topic = session.createTopic(addressName.toString());
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("hello"));
producer.close();
Assert.assertTrue(((ActiveMQDestination) topic).isCreated());
}
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage)consumer.receive(10_000);
Assert.assertNotNull(message);
Assert.assertEquals("hello", message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
}
private void sendStringOfSize(int msgSize, boolean useCoreReceive) throws JMSException {

View File

@ -41,15 +41,15 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.After;
import org.junit.Assert;
@ -300,7 +300,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
producer.send(session.createTextMessage("hello"));
}
Assert.assertTrue(((ActiveMQConnection)connection).containsKnownDestination(addressName));
Assert.assertTrue((((ActiveMQDestination) topic).isCreated()));
}
@Test (timeout = 30000)
@ -324,7 +324,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
Assert.fail("Expected to throw exception here");
} catch (JMSException expected) {
}
Assert.assertFalse(((ActiveMQConnection)connection).containsKnownDestination(addressName));
Assert.assertFalse(((ActiveMQDestination) queue).isCreated());
}
}
@ -349,7 +349,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
} catch (JMSException expected) {
}
Assert.assertFalse(((ActiveMQConnection)connection).containsKnownDestination(addressName));
Assert.assertFalse(((ActiveMQDestination) queue).isCreated());
}
@ -370,7 +370,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
MessageProducer producer = session.createProducer(queue);
Assert.assertNotNull(server.locateQueue(addressName));
Assert.assertTrue(((ActiveMQConnection) connection).containsKnownDestination(addressName));
Assert.assertTrue(((ActiveMQDestination) queue).isCreated());
}
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
@ -79,9 +80,11 @@ public class TemporaryDestinationTest extends JMSTestBase {
consumer.close();
assertTrue(((ActiveMQDestination) tempQueue).isCreated());
tempQueue.delete();
assertFalse(conn.containsKnownDestination(SimpleString.toSimpleString(tempQueue.getQueueName())));
assertFalse(((ActiveMQDestination) tempQueue).isCreated());
assertFalse(conn.containsTemporaryQueue(SimpleString.toSimpleString(tempQueue.getQueueName())));
} finally {