This commit is contained in:
Clebert Suconic 2017-11-03 23:51:07 -04:00
commit 07eb1d25be
10 changed files with 394 additions and 21 deletions

View File

@ -420,7 +420,16 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
throw new InvalidDestinationException("Destination " + address + " does not exist");
}
} else {
connection.addKnownDestination(address);
ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
if (queueQuery.isExists()) {
connection.addKnownDestination(address);
} else if (destination.isQueue() && query.isAutoCreateQueues()) {
if (destination.isTemporary()) {
clientSession.createTemporaryQueue(address, RoutingType.ANYCAST, address);
} else {
clientSession.createQueue(address, RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), query.isDefaultPurgeOnNoConsumers());
}
}
}
} catch (ActiveMQQueueExistsException e) {
// The queue was created by another client/admin between the query check and send create queue packet

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
@ -268,9 +269,11 @@ public class AMQPSessionCallback implements SessionCallback {
queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
}
if (queueQueryResult.getRoutingType() != routingType) {
// if auto-create we will return whatever type was used before
if (!queueQueryResult.isAutoCreated() && queueQueryResult.getRoutingType() != routingType) {
throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
}
return queueQueryResult;
}
@ -284,11 +287,14 @@ public class AMQPSessionCallback implements SessionCallback {
// The address may have been created by another thread in the mean time. Catch and do nothing.
}
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
} else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
try {
serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
} else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) {
QueueQueryResult queueBinding = serverSession.executeQueueQuery(simpleAddress);
if (!queueBinding.isExists()) {
try {
serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
}
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
}
@ -383,7 +389,8 @@ public class AMQPSessionCallback implements SessionCallback {
((ServerConsumer) consumer).receiveCredits(-1);
}
public void serverSend(final Transaction transaction,
public void serverSend(final ProtonServerReceiverContext context,
final Transaction transaction,
final Receiver receiver,
final Delivery delivery,
String address,
@ -394,14 +401,17 @@ public class AMQPSessionCallback implements SessionCallback {
message.setAddress(new SimpleString(address));
} else {
// Anonymous relay must set a To value
if (message.getAddress() == null) {
address = message.getAddress();
if (address == null) {
rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
return;
}
}
if (!bindingQuery(message.getAddress().toString(), RoutingType.ANYCAST)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
//here check queue-autocreation
RoutingType routingType = context.getRoutingType(receiver, RoutingType.ANYCAST);
if (!bindingQuery(address, routingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
OperationContext oldcontext = recoverContext();

View File

@ -96,14 +96,17 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
// We don't currently support SECOND so enforce that the answer is anlways FIRST
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
RoutingType defRoutingType;
if (target != null) {
if (target.getDynamic()) {
defRoutingType = getRoutingType(target.getCapabilities());
// if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session
address = sessionSPI.tempQueueName();
try {
sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities()));
sessionSPI.createTemporaryQueue(address, defRoutingType);
} catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(e.getMessage());
} catch (Exception e) {
@ -118,8 +121,9 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
address = target.getAddress();
if (address != null && !address.isEmpty()) {
defRoutingType = getRoutingType(target.getCapabilities());
try {
if (!sessionSPI.bindingQuery(address, getRoutingType(target.getCapabilities()))) {
if (!sessionSPI.bindingQuery(address, defRoutingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
} catch (ActiveMQAMQPNotFoundException e) {
@ -177,7 +181,16 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
flow(amqpCredits, minCreditRefresh);
}
public RoutingType getRoutingType(Receiver receiver, RoutingType defaultType) {
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
return target != null ? getRoutingType(target.getCapabilities(), defaultType) : getRoutingType((Symbol[])null, defaultType);
}
private RoutingType getRoutingType(Symbol[] symbols) {
return getRoutingType(symbols, null);
}
private RoutingType getRoutingType(Symbol[] symbols, RoutingType defaultType) {
if (symbols != null) {
for (Symbol symbol : symbols) {
if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
@ -188,7 +201,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
}
return sessionSPI.getDefaultRoutingType(address);
if (defaultType != null) {
return defaultType;
} else {
return sessionSPI.getDefaultRoutingType(address);
}
}
/*
@ -223,7 +240,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
}
sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data);
flow(amqpCredits, minCreditRefresh);
} catch (Exception e) {

View File

@ -446,7 +446,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
if (!queueNames.isEmpty()) {
final List<SimpleString> convertedQueueNames = request.convertQueueNames(clientVersion, queueNames);
if (convertedQueueNames != queueNames) {
result = new BindingQueryResult(result.isExists(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers());
result = new BindingQueryResult(result.isExists(), result.getAddressInfo(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers());
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server;
import java.util.List;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
public class BindingQueryResult {
@ -34,12 +35,17 @@ public class BindingQueryResult {
private int defaultMaxConsumers;
private final AddressInfo addressInfo;
public BindingQueryResult(final boolean exists,
final AddressInfo addressInfo,
final List<SimpleString> queueNames,
final boolean autoCreateQueues,
final boolean autoCreateAddresses,
final boolean defaultPurgeOnNoConsumers,
final int defaultMaxConsumers) {
this.addressInfo = addressInfo;
this.exists = exists;
this.queueNames = queueNames;
@ -57,6 +63,10 @@ public class BindingQueryResult {
return exists;
}
public AddressInfo getAddressInfo() {
return addressInfo;
}
public boolean isAutoCreateQueues() {
return autoCreateQueues;
}

View File

@ -852,7 +852,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString bindAddress = new SimpleString(realAddress);
if (managementService != null) {
if (bindAddress.equals(managementService.getManagementAddress())) {
return new BindingQueryResult(true, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
return new BindingQueryResult(true, null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
}
}
@ -868,7 +868,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
return new BindingQueryResult(getAddressInfo(bindAddress) != null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
AddressInfo info = getAddressInfo(bindAddress);
return new BindingQueryResult(info != null, info, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
}
@Override

View File

@ -40,6 +40,11 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
SimpleString queue1 = new SimpleString("queue1");
SimpleString queue2 = new SimpleString("queue2");
@Override
protected boolean isAutoCreateQueues() {
return false;
}
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
@ -187,6 +192,8 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeWhenNoAddressCreatedAutoCreate() throws Exception {
// This test needs auto-create.. for that just clear the settings and use defaults
server.getAddressSettingsRepository().clear();
AddressSettings settings = new AddressSettings();
settings.setAutoCreateAddresses(true);
server.getAddressSettingsRepository().addMatch(address.toString(), settings);

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.math.BigInteger;
import java.util.Map;
import java.util.Random;
//adapted from https://issues.apache.org/jira/browse/ARTEMIS-1416
public class QueueAutoCreationTest extends JMSClientTestSupport {
Queue queue1;
Random random = new Random();
ActiveMQConnection testConn;
ClientSession clientSession;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
String randomSuffix = new BigInteger(130, random).toString(32);
testConn = (ActiveMQConnection)createCoreConnection();
clientSession = testConn.getSessionFactory().createSession();
queue1 = createQueue("queue1_" + randomSuffix);
}
@Override
@After
public void tearDown() throws Exception {
testConn.close();
super.tearDown();
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,CORE";
}
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
Configuration serverConfig = server.getConfiguration();
serverConfig.setJournalType(JournalType.NIO);
Map<String, AddressSettings> map = serverConfig.getAddressesSettings();
if (map.size() == 0) {
AddressSettings as = new AddressSettings();
map.put("#", as);
}
Map.Entry<String, AddressSettings> entry = map.entrySet().iterator().next();
AddressSettings settings = entry.getValue();
settings.setAutoCreateQueues(true);
System.out.println("server cofg, isauto? " + entry.getValue().isAutoCreateQueues());
}
protected Queue createQueue(final String queueName) throws Exception {
SimpleString address = SimpleString.toSimpleString(queueName);
clientSession.createAddress(address, RoutingType.ANYCAST, false);
return new ActiveMQQueue(queueName);
}
@Test(timeout = 30000)
public void testSmallString() throws Exception {
sendStringOfSize(1024, false);
}
@Test(timeout = 30000)
public void testHugeString() throws Exception {
//amqp doesn't support large message receive.
//using core to receive, it can verify
//that the large message is indeed stored in core
//via amqp send.
sendStringOfSize(1024 * 1024, true);
}
private void sendStringOfSize(int msgSize, boolean useCoreReceive) throws JMSException {
Connection conn = this.createConnection();
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
TextMessage m = session.createTextMessage();
m.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
StringBuffer buffer = new StringBuffer();
while (buffer.length() < msgSize) {
buffer.append(UUIDGenerator.getInstance().generateStringUUID());
}
final String originalString = buffer.toString();
m.setText(originalString);
prod.send(m);
conn.close();
if (useCoreReceive) {
conn = createCoreConnection();
} else {
conn = createConnection();
}
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = session.createConsumer(queue1);
conn.start();
TextMessage rm = (TextMessage) cons.receive(5000);
Assert.assertNotNull(rm);
String str = rm.getText();
Assert.assertEquals(originalString, str);
} finally {
if (conn != null) {
conn.close();
}
}
}
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
//adapted from https://issues.apache.org/jira/browse/ARTEMIS-1416
@RunWith(Parameterized.class)
public class LargeMessageQueueAutoCreationTest extends BasicOpenWireTest {
Queue queue1;
Random random = new Random();
ActiveMQConnection testConn;
ClientSession clientSession;
@Parameterized.Parameter
public boolean usingCore;
@Parameterized.Parameters(name = "isCore={0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{{true}, {false}});
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
String randomSuffix = new BigInteger(130, random).toString(32);
testConn = (ActiveMQConnection)coreCf.createConnection();
clientSession = testConn.getSessionFactory().createSession();
queue1 = createCoreQueue("queue1_" + randomSuffix);
}
@Override
@After
public void tearDown() throws Exception {
testConn.close();
super.tearDown();
}
@Override
protected void extraServerConfig(Configuration serverConfig) {
serverConfig.setJournalType(JournalType.NIO);
Map<String, AddressSettings> map = serverConfig.getAddressesSettings();
Map.Entry<String, AddressSettings> entry = map.entrySet().iterator().next();
AddressSettings settings = entry.getValue();
settings.setAutoCreateQueues(true);
System.out.println("server cofg, isauto? " + entry.getValue().isAutoCreateQueues());
}
protected Queue createCoreQueue(final String queueName) throws Exception {
SimpleString address = SimpleString.toSimpleString(queueName);
clientSession.createAddress(address, RoutingType.ANYCAST, false);
return new ActiveMQQueue(queueName);
}
@Test(timeout = 30000)
public void testSmallString() throws Exception {
sendStringOfSize(1024);
}
@Test(timeout = 30000)
public void testHugeString() throws Exception {
sendStringOfSize(1024 * 1024);
}
private void sendStringOfSize(int msgSize) throws JMSException {
ConnectionFactory factoryToUse = usingCore ? coreCf : factory;
Connection conn = factoryToUse.createConnection();
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
TextMessage m = session.createTextMessage();
m.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
StringBuffer buffer = new StringBuffer();
while (buffer.length() < msgSize) {
buffer.append(UUIDGenerator.getInstance().generateStringUUID());
}
final String originalString = buffer.toString();
m.setText(originalString);
prod.send(m);
conn.close();
conn = factoryToUse.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = session.createConsumer(queue1);
conn.start();
TextMessage rm = (TextMessage) cons.receive(5000);
Assert.assertNotNull(rm);
String str = rm.getText();
Assert.assertEquals(originalString, str);
} finally {
if (conn != null) {
conn.close();
}
}
}
}

View File

@ -161,8 +161,7 @@ public abstract class PubSubTestCase extends JMSTestCase {
subscriberTCF = null;
subscriberSession = null;
subscriberConnection = null;
super.tearDown();
}
super.tearDown();
}
}