diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index 3121a88333..9f86e493e2 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -420,7 +420,16 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To throw new InvalidDestinationException("Destination " + address + " does not exist"); } } else { - connection.addKnownDestination(address); + ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address); + if (queueQuery.isExists()) { + connection.addKnownDestination(address); + } else if (destination.isQueue() && query.isAutoCreateQueues()) { + if (destination.isTemporary()) { + clientSession.createTemporaryQueue(address, RoutingType.ANYCAST, address); + } else { + clientSession.createQueue(address, RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), query.isDefaultPurgeOnNoConsumers()); + } + } } } catch (ActiveMQQueueExistsException e) { // The queue was created by another client/admin between the query check and send create queue packet diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 19f6351f5a..667d57aeee 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -48,6 +48,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; @@ -268,9 +269,11 @@ public class AMQPSessionCallback implements SessionCallback { queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); } - if (queueQueryResult.getRoutingType() != routingType) { + // if auto-create we will return whatever type was used before + if (!queueQueryResult.isAutoCreated() && queueQueryResult.getRoutingType() != routingType) { throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType); } + return queueQueryResult; } @@ -284,11 +287,14 @@ public class AMQPSessionCallback implements SessionCallback { // The address may have been created by another thread in the mean time. Catch and do nothing. } bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); - } else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) { - try { - serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true); - } catch (ActiveMQQueueExistsException e) { - // The queue may have been created by another thread in the mean time. Catch and do nothing. + } else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) { + QueueQueryResult queueBinding = serverSession.executeQueueQuery(simpleAddress); + if (!queueBinding.isExists()) { + try { + serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true); + } catch (ActiveMQQueueExistsException e) { + // The queue may have been created by another thread in the mean time. Catch and do nothing. + } } bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); } @@ -383,7 +389,8 @@ public class AMQPSessionCallback implements SessionCallback { ((ServerConsumer) consumer).receiveCredits(-1); } - public void serverSend(final Transaction transaction, + public void serverSend(final ProtonServerReceiverContext context, + final Transaction transaction, final Receiver receiver, final Delivery delivery, String address, @@ -394,14 +401,17 @@ public class AMQPSessionCallback implements SessionCallback { message.setAddress(new SimpleString(address)); } else { // Anonymous relay must set a To value - if (message.getAddress() == null) { + address = message.getAddress(); + if (address == null) { rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer"); return; } + } - if (!bindingQuery(message.getAddress().toString(), RoutingType.ANYCAST)) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); - } + //here check queue-autocreation + RoutingType routingType = context.getRoutingType(receiver, RoutingType.ANYCAST); + if (!bindingQuery(address, routingType)) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); } OperationContext oldcontext = recoverContext(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index bcab2eaab9..15318d5b7a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -96,14 +96,17 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements // We don't currently support SECOND so enforce that the answer is anlways FIRST receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + RoutingType defRoutingType; + if (target != null) { if (target.getDynamic()) { + defRoutingType = getRoutingType(target.getCapabilities()); // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and // will be deleted on closing of the session address = sessionSPI.tempQueueName(); try { - sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities())); + sessionSPI.createTemporaryQueue(address, defRoutingType); } catch (ActiveMQSecurityException e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(e.getMessage()); } catch (Exception e) { @@ -118,8 +121,9 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements address = target.getAddress(); if (address != null && !address.isEmpty()) { + defRoutingType = getRoutingType(target.getCapabilities()); try { - if (!sessionSPI.bindingQuery(address, getRoutingType(target.getCapabilities()))) { + if (!sessionSPI.bindingQuery(address, defRoutingType)) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); } } catch (ActiveMQAMQPNotFoundException e) { @@ -177,7 +181,16 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements flow(amqpCredits, minCreditRefresh); } + public RoutingType getRoutingType(Receiver receiver, RoutingType defaultType) { + org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); + return target != null ? getRoutingType(target.getCapabilities(), defaultType) : getRoutingType((Symbol[])null, defaultType); + } + private RoutingType getRoutingType(Symbol[] symbols) { + return getRoutingType(symbols, null); + } + + private RoutingType getRoutingType(Symbol[] symbols, RoutingType defaultType) { if (symbols != null) { for (Symbol symbol : symbols) { if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { @@ -188,7 +201,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } } - return sessionSPI.getDefaultRoutingType(address); + if (defaultType != null) { + return defaultType; + } else { + return sessionSPI.getDefaultRoutingType(address); + } } /* @@ -223,7 +240,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements tx = this.sessionSPI.getTransaction(txState.getTxnId(), false); } - sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data); + sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data); flow(amqpCredits, minCreditRefresh); } catch (Exception e) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 8f3c5bea88..75ef0714f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -446,7 +446,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { if (!queueNames.isEmpty()) { final List convertedQueueNames = request.convertQueueNames(clientVersion, queueNames); if (convertedQueueNames != queueNames) { - result = new BindingQueryResult(result.isExists(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); + result = new BindingQueryResult(result.isExists(), result.getAddressInfo(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java index 7eed31f5e3..7f340b1549 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server; import java.util.List; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; public class BindingQueryResult { @@ -34,12 +35,17 @@ public class BindingQueryResult { private int defaultMaxConsumers; + private final AddressInfo addressInfo; + public BindingQueryResult(final boolean exists, + final AddressInfo addressInfo, final List queueNames, final boolean autoCreateQueues, final boolean autoCreateAddresses, final boolean defaultPurgeOnNoConsumers, final int defaultMaxConsumers) { + this.addressInfo = addressInfo; + this.exists = exists; this.queueNames = queueNames; @@ -57,6 +63,10 @@ public class BindingQueryResult { return exists; } + public AddressInfo getAddressInfo() { + return addressInfo; + } + public boolean isAutoCreateQueues() { return autoCreateQueues; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 587cff9e38..8077ae9700 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -852,7 +852,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { SimpleString bindAddress = new SimpleString(realAddress); if (managementService != null) { if (bindAddress.equals(managementService.getManagementAddress())) { - return new BindingQueryResult(true, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers); + return new BindingQueryResult(true, null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers); } } @@ -868,7 +868,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - return new BindingQueryResult(getAddressInfo(bindAddress) != null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers); + AddressInfo info = getAddressInfo(bindAddress); + + return new BindingQueryResult(info != null, info, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java index e10c73d97b..7f0fdd8dd8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java @@ -40,6 +40,11 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { SimpleString queue1 = new SimpleString("queue1"); SimpleString queue2 = new SimpleString("queue2"); + @Override + protected boolean isAutoCreateQueues() { + return false; + } + @Test(timeout = 60000) public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); @@ -187,6 +192,8 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testConsumeWhenNoAddressCreatedAutoCreate() throws Exception { + // This test needs auto-create.. for that just clear the settings and use defaults + server.getAddressSettingsRepository().clear(); AddressSettings settings = new AddressSettings(); settings.setAutoCreateAddresses(true); server.getAddressSettingsRepository().addMatch(address.toString(), settings); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java new file mode 100644 index 0000000000..f6c8b22152 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQQueue; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.math.BigInteger; +import java.util.Map; +import java.util.Random; + +//adapted from https://issues.apache.org/jira/browse/ARTEMIS-1416 +public class QueueAutoCreationTest extends JMSClientTestSupport { + + Queue queue1; + Random random = new Random(); + ActiveMQConnection testConn; + ClientSession clientSession; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + String randomSuffix = new BigInteger(130, random).toString(32); + testConn = (ActiveMQConnection)createCoreConnection(); + clientSession = testConn.getSessionFactory().createSession(); + queue1 = createQueue("queue1_" + randomSuffix); + } + + @Override + @After + public void tearDown() throws Exception { + testConn.close(); + super.tearDown(); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,CORE"; + } + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + Configuration serverConfig = server.getConfiguration(); + serverConfig.setJournalType(JournalType.NIO); + Map map = serverConfig.getAddressesSettings(); + if (map.size() == 0) { + AddressSettings as = new AddressSettings(); + map.put("#", as); + } + Map.Entry entry = map.entrySet().iterator().next(); + AddressSettings settings = entry.getValue(); + settings.setAutoCreateQueues(true); + System.out.println("server cofg, isauto? " + entry.getValue().isAutoCreateQueues()); + } + + + protected Queue createQueue(final String queueName) throws Exception { + SimpleString address = SimpleString.toSimpleString(queueName); + clientSession.createAddress(address, RoutingType.ANYCAST, false); + return new ActiveMQQueue(queueName); + } + + @Test(timeout = 30000) + public void testSmallString() throws Exception { + sendStringOfSize(1024, false); + } + + @Test(timeout = 30000) + public void testHugeString() throws Exception { + //amqp doesn't support large message receive. + //using core to receive, it can verify + //that the large message is indeed stored in core + //via amqp send. + sendStringOfSize(1024 * 1024, true); + } + + private void sendStringOfSize(int msgSize, boolean useCoreReceive) throws JMSException { + + Connection conn = this.createConnection(); + + try { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer prod = session.createProducer(queue1); + + TextMessage m = session.createTextMessage(); + + m.setJMSDeliveryMode(DeliveryMode.PERSISTENT); + + StringBuffer buffer = new StringBuffer(); + while (buffer.length() < msgSize) { + buffer.append(UUIDGenerator.getInstance().generateStringUUID()); + } + + final String originalString = buffer.toString(); + + m.setText(originalString); + + prod.send(m); + + conn.close(); + + if (useCoreReceive) { + conn = createCoreConnection(); + } else { + conn = createConnection(); + } + + session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer cons = session.createConsumer(queue1); + + conn.start(); + + TextMessage rm = (TextMessage) cons.receive(5000); + Assert.assertNotNull(rm); + + String str = rm.getText(); + Assert.assertEquals(originalString, str); + } finally { + if (conn != null) { + conn.close(); + } + } + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java new file mode 100644 index 0000000000..b3f224d07f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQQueue; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Random; + +//adapted from https://issues.apache.org/jira/browse/ARTEMIS-1416 +@RunWith(Parameterized.class) +public class LargeMessageQueueAutoCreationTest extends BasicOpenWireTest { + + Queue queue1; + Random random = new Random(); + ActiveMQConnection testConn; + ClientSession clientSession; + + @Parameterized.Parameter + public boolean usingCore; + + @Parameterized.Parameters(name = "isCore={0}") + public static Collection params() { + return Arrays.asList(new Object[][]{{true}, {false}}); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + String randomSuffix = new BigInteger(130, random).toString(32); + testConn = (ActiveMQConnection)coreCf.createConnection(); + clientSession = testConn.getSessionFactory().createSession(); + queue1 = createCoreQueue("queue1_" + randomSuffix); + } + + @Override + @After + public void tearDown() throws Exception { + testConn.close(); + super.tearDown(); + } + + @Override + protected void extraServerConfig(Configuration serverConfig) { + serverConfig.setJournalType(JournalType.NIO); + Map map = serverConfig.getAddressesSettings(); + Map.Entry entry = map.entrySet().iterator().next(); + AddressSettings settings = entry.getValue(); + settings.setAutoCreateQueues(true); + System.out.println("server cofg, isauto? " + entry.getValue().isAutoCreateQueues()); + } + + + protected Queue createCoreQueue(final String queueName) throws Exception { + SimpleString address = SimpleString.toSimpleString(queueName); + clientSession.createAddress(address, RoutingType.ANYCAST, false); + return new ActiveMQQueue(queueName); + } + + @Test(timeout = 30000) + public void testSmallString() throws Exception { + sendStringOfSize(1024); + } + + @Test(timeout = 30000) + public void testHugeString() throws Exception { + sendStringOfSize(1024 * 1024); + } + + private void sendStringOfSize(int msgSize) throws JMSException { + + ConnectionFactory factoryToUse = usingCore ? coreCf : factory; + + Connection conn = factoryToUse.createConnection(); + + try { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer prod = session.createProducer(queue1); + + TextMessage m = session.createTextMessage(); + + m.setJMSDeliveryMode(DeliveryMode.PERSISTENT); + + StringBuffer buffer = new StringBuffer(); + while (buffer.length() < msgSize) { + buffer.append(UUIDGenerator.getInstance().generateStringUUID()); + } + + final String originalString = buffer.toString(); + + m.setText(originalString); + + prod.send(m); + + conn.close(); + + conn = factoryToUse.createConnection(); + + session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer cons = session.createConsumer(queue1); + + conn.start(); + + TextMessage rm = (TextMessage) cons.receive(5000); + Assert.assertNotNull(rm); + + String str = rm.getText(); + Assert.assertEquals(originalString, str); + } finally { + if (conn != null) { + conn.close(); + } + } + } +} \ No newline at end of file diff --git a/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java b/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java index d5792c0dc6..d34002441d 100644 --- a/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java +++ b/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java @@ -161,8 +161,7 @@ public abstract class PubSubTestCase extends JMSTestCase { subscriberTCF = null; subscriberSession = null; subscriberConnection = null; + super.tearDown(); } - - super.tearDown(); } }