ARTEMIS-1416 Queue is not autocreated if address already exists

- Fix on core and amqp.
- Add test to verify amqp's current large message behavior.
- Add test to openwire also just to verify.
This commit is contained in:
Howard Gao 2017-10-30 12:47:23 +08:00 committed by Clebert Suconic
parent b0c83073e2
commit f3ace6afd7
4 changed files with 343 additions and 10 deletions

View File

@ -420,7 +420,16 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
throw new InvalidDestinationException("Destination " + address + " does not exist");
}
} else {
connection.addKnownDestination(address);
ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
if (queueQuery.isExists()) {
connection.addKnownDestination(address);
} else if (destination.isQueue() && query.isAutoCreateQueues()) {
if (destination.isTemporary()) {
clientSession.createTemporaryQueue(address, RoutingType.ANYCAST, address);
} else {
clientSession.createQueue(address, RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), query.isDefaultPurgeOnNoConsumers());
}
}
}
} catch (ActiveMQQueueExistsException e) {
// The queue was created by another client/admin between the query check and send create queue packet

View File

@ -284,11 +284,14 @@ public class AMQPSessionCallback implements SessionCallback {
// The address may have been created by another thread in the mean time. Catch and do nothing.
}
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
} else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
try {
serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
} else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) {
QueueQueryResult queueBinding = serverSession.executeQueueQuery(simpleAddress);
if (!queueBinding.isExists()) {
try {
serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
}
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
}
@ -394,14 +397,16 @@ public class AMQPSessionCallback implements SessionCallback {
message.setAddress(new SimpleString(address));
} else {
// Anonymous relay must set a To value
if (message.getAddress() == null) {
address = message.getAddress();
if (address == null) {
rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
return;
}
}
if (!bindingQuery(message.getAddress().toString(), RoutingType.ANYCAST)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
//here check queue-autocreation
if (!bindingQuery(address, RoutingType.ANYCAST)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
OperationContext oldcontext = recoverContext();

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.math.BigInteger;
import java.util.Map;
import java.util.Random;
//adapted from https://issues.apache.org/jira/browse/ARTEMIS-1416
public class QueueAutoCreationTest extends JMSClientTestSupport {
Queue queue1;
Random random = new Random();
ActiveMQConnection testConn;
ClientSession clientSession;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
String randomSuffix = new BigInteger(130, random).toString(32);
testConn = (ActiveMQConnection)createCoreConnection();
clientSession = testConn.getSessionFactory().createSession();
queue1 = createQueue("queue1_" + randomSuffix);
}
@Override
@After
public void tearDown() throws Exception {
testConn.close();
super.tearDown();
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,CORE";
}
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
Configuration serverConfig = server.getConfiguration();
serverConfig.setJournalType(JournalType.NIO);
Map<String, AddressSettings> map = serverConfig.getAddressesSettings();
if (map.size() == 0) {
AddressSettings as = new AddressSettings();
map.put("#", as);
}
Map.Entry<String, AddressSettings> entry = map.entrySet().iterator().next();
AddressSettings settings = entry.getValue();
settings.setAutoCreateQueues(true);
System.out.println("server cofg, isauto? " + entry.getValue().isAutoCreateQueues());
}
protected Queue createQueue(final String queueName) throws Exception {
SimpleString address = SimpleString.toSimpleString(queueName);
clientSession.createAddress(address, RoutingType.ANYCAST, false);
return new ActiveMQQueue(queueName);
}
@Test(timeout = 30000)
public void testSmallString() throws Exception {
sendStringOfSize(1024, false);
}
@Test(timeout = 30000)
public void testHugeString() throws Exception {
//amqp doesn't support large message receive.
//using core to receive, it can verify
//that the large message is indeed stored in core
//via amqp send.
sendStringOfSize(1024 * 1024, true);
}
private void sendStringOfSize(int msgSize, boolean useCoreReceive) throws JMSException {
Connection conn = this.createConnection();
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
TextMessage m = session.createTextMessage();
m.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
StringBuffer buffer = new StringBuffer();
while (buffer.length() < msgSize) {
buffer.append(UUIDGenerator.getInstance().generateStringUUID());
}
final String originalString = buffer.toString();
m.setText(originalString);
prod.send(m);
conn.close();
if (useCoreReceive) {
conn = createCoreConnection();
} else {
conn = createConnection();
}
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = session.createConsumer(queue1);
conn.start();
TextMessage rm = (TextMessage) cons.receive(5000);
Assert.assertNotNull(rm);
String str = rm.getText();
Assert.assertEquals(originalString, str);
} finally {
if (conn != null) {
conn.close();
}
}
}
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
//adapted from https://issues.apache.org/jira/browse/ARTEMIS-1416
@RunWith(Parameterized.class)
public class LargeMessageQueueAutoCreationTest extends BasicOpenWireTest {
Queue queue1;
Random random = new Random();
ActiveMQConnection testConn;
ClientSession clientSession;
@Parameterized.Parameter
public boolean usingCore;
@Parameterized.Parameters(name = "isCore={0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{{true}, {false}});
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
String randomSuffix = new BigInteger(130, random).toString(32);
testConn = (ActiveMQConnection)coreCf.createConnection();
clientSession = testConn.getSessionFactory().createSession();
queue1 = createCoreQueue("queue1_" + randomSuffix);
}
@Override
@After
public void tearDown() throws Exception {
testConn.close();
super.tearDown();
}
@Override
protected void extraServerConfig(Configuration serverConfig) {
serverConfig.setJournalType(JournalType.NIO);
Map<String, AddressSettings> map = serverConfig.getAddressesSettings();
Map.Entry<String, AddressSettings> entry = map.entrySet().iterator().next();
AddressSettings settings = entry.getValue();
settings.setAutoCreateQueues(true);
System.out.println("server cofg, isauto? " + entry.getValue().isAutoCreateQueues());
}
protected Queue createCoreQueue(final String queueName) throws Exception {
SimpleString address = SimpleString.toSimpleString(queueName);
clientSession.createAddress(address, RoutingType.ANYCAST, false);
return new ActiveMQQueue(queueName);
}
@Test(timeout = 30000)
public void testSmallString() throws Exception {
sendStringOfSize(1024);
}
@Test(timeout = 30000)
public void testHugeString() throws Exception {
sendStringOfSize(1024 * 1024);
}
private void sendStringOfSize(int msgSize) throws JMSException {
ConnectionFactory factoryToUse = usingCore ? coreCf : factory;
Connection conn = factoryToUse.createConnection();
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
TextMessage m = session.createTextMessage();
m.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
StringBuffer buffer = new StringBuffer();
while (buffer.length() < msgSize) {
buffer.append(UUIDGenerator.getInstance().generateStringUUID());
}
final String originalString = buffer.toString();
m.setText(originalString);
prod.send(m);
conn.close();
conn = factoryToUse.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = session.createConsumer(queue1);
conn.start();
TextMessage rm = (TextMessage) cons.receive(5000);
Assert.assertNotNull(rm);
String str = rm.getText();
Assert.assertEquals(originalString, str);
} finally {
if (conn != null) {
conn.close();
}
}
}
}