diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index dd10e5bcba..f1b9cefd22 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -535,7 +535,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi queueName, routingType, null, - true, + false, false, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index 4c1d33564f..b814bc23fa 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -36,6 +36,7 @@ import javax.jms.TopicPublisher; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -421,7 +422,10 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To } else { connection.addKnownDestination(address); } - } catch (ActiveMQException e) { + } catch (ActiveMQQueueExistsException e) { + // The queue was created by another client/admin between the query check and send create queue packet + } + catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 3e9b76f06c..a25215e4dc 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -301,15 +301,21 @@ public class ActiveMQSession implements QueueSession, TopicSession { ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress()); if (!response.isExists()) { - if (jbd.isQueue() && response.isAutoCreateJmsQueues()) { - // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers) - session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true); - session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true); - } else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) { - session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true); - } else { - throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); + try { + if (jbd.isQueue() && response.isAutoCreateJmsQueues()) { + // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers) + session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true); + session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true); + } else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) { + session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true); + } else { + throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); + } } + catch (ActiveMQQueueExistsException e) { + // Queue was created between our query and create queue request. Ignore. + } + } } @@ -647,7 +653,12 @@ public class ActiveMQSession implements QueueSession, TopicSession { */ if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) { if (response.isAutoCreateJmsQueues()) { - session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true); + try { + session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true); + } + catch (ActiveMQQueueExistsException e) { + // The queue was created by another client/admin between the query check and send create queue packet + } } else { throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist"); } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 3a2ad7e939..c266e76314 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -83,7 +83,7 @@ public class MQTTPublishManager { } private void createManagementAddress() { - managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId()); + managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId()); } private void createManagementQueue() throws Exception { @@ -113,10 +113,13 @@ public class MQTTPublishManager { if (qos == 0) { sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos); session.getServerSession().acknowledge(consumer.getID(), message.getMessageID()); - } else { + } else if (qos == 1 || qos == 2) { int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID()); outboundStore.publish(mqttid, message.getMessageID(), consumer.getID()); sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos); + } else { + // Client must have disconnected and it's Subscription QoS cleared + consumer.individualCancel(message.getMessageID(), false); } } } @@ -231,7 +234,14 @@ public class MQTTPublishManager { } private int decideQoS(ServerMessage message, ServerConsumer consumer) { - int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID()); + + int subscriptionQoS = -1; + try { + subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID()); + } catch (NullPointerException e) { + // This can happen if the client disconnected during a server send. + return subscriptionQoS; + } int qos = 2; if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index b3542d33d2..c4b8b94c80 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -94,7 +94,7 @@ public class MQTTSubscriptionManager { Queue q = session.getServer().locateQueue(queue); if (q == null) { - q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, true); + q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false); } else { if (q.isDeleteOnNoConsumers()) { throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java index c3aa9deac2..96844814e0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java @@ -48,6 +48,10 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres for (RoutingType routingType : routingTypes) { sb.append(routingType.toString() + ","); } + if (sb.charAt(sb.length() - 1) == ',') { + sb.deleteCharAt(sb.length() - 1); + } + sb.append("}"); sb.append(", autoCreated=" + autoCreated + "]"); return sb.toString(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 06852ceaba..abcbb89b3d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2314,16 +2314,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { recoverStoredConfigs(); + Map addressBindingInfosMap = new HashMap<>(); + + journalLoader.initAddresses(addressBindingInfosMap, addressBindingInfos); + Map queueBindingInfosMap = new HashMap<>(); journalLoader.initQueues(queueBindingInfosMap, queueBindingInfos); journalLoader.handleGroupingBindings(groupingInfos); - Map addressBindingInfosMap = new HashMap<>(); - - journalLoader.initAddresses(addressBindingInfosMap, addressBindingInfos); - Map>> duplicateIDMap = new HashMap<>(); HashSet> pendingLargeMessages = new HashSet<>(); @@ -2473,10 +2473,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType); AddressInfo info = postOffice.getAddressInfo(addressName); + boolean addressAlreadyExists = true; + if (info == null) { if (autoCreateAddress) { - postOffice.addAddressInfo(defaultAddressInfo.setAutoCreated(true)); - info = postOffice.getAddressInfo(addressName); + createAddressInfo(defaultAddressInfo.setAutoCreated(true)); + addressAlreadyExists = false; } else { throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName); } @@ -2486,12 +2488,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { final Queue queue = queueFactory.createQueueWith(queueConfig); - boolean addressAlreadyExists = true; - AddressInfo addressInfo = postOffice.getAddressInfo(queue.getAddress()); if (addressInfo == null) { - postOffice.addAddressInfo(new AddressInfo(queue.getAddress())); - addressAlreadyExists = false; + createAddressInfo(new AddressInfo(queue.getAddress())); } else { if (!addressInfo.getRoutingTypes().contains(routingType)) { throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, addressInfo.getName().toString(), addressInfo.getRoutingTypes()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java index 4e3f689a4b..0beaab74f1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java @@ -52,6 +52,6 @@ public class AddressConfigTest extends ActiveMQTestBase { Set routingTypeSet = new HashSet<>(); routingTypeSet.add(RoutingType.MULTICAST); - assertEquals(RoutingType.MULTICAST, addressInfo.getRoutingTypes()); + assertEquals(routingTypeSet, addressInfo.getRoutingTypes()); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java new file mode 100644 index 0000000000..9208386b3a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.addressing; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.TimeUtils; +import org.junit.Before; +import org.junit.Test; + +public class AnycastTest extends ActiveMQTestBase { + + private SimpleString baseAddress = new SimpleString("anycast.address"); + + private AddressInfo addressInfo; + + private ActiveMQServer server; + + private ClientSessionFactory sessionFactory; + + @Before + public void setup() throws Exception { + server = createServer(true); + server.start(); + + server.waitForActivation(10, TimeUnit.SECONDS); + + ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + sessionFactory = sl.createSessionFactory(); + + addSessionFactory(sessionFactory); + + addressInfo = new AddressInfo(baseAddress); + addressInfo.addRoutingType(RoutingType.ANYCAST); + server.createOrUpdateAddressInfo(addressInfo); + } + + @Test + public void testTxCommitReceive() throws Exception { + + Queue q1 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + Queue q2 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + + ClientSession session = sessionFactory.createSession(false, false); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(baseAddress); + + final int num = 10; + + for (int i = 0; i < num; i++) { + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + m.getBodyBuffer().writeString("AnyCast" + i); + producer.send(m); + } + assertNull(consumer1.receive(200)); + assertNull(consumer2.receive(200)); + session.commit(); + + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q1.getMessageCount())); + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q2.getMessageCount())); + + ClientConsumer[] consumers = new ClientConsumer[]{consumer1, consumer2}; + for (int i = 0; i < consumers.length; i++) { + + for (int j = 0; j < num / 2; j++) { + ClientMessage m = consumers[i].receive(2000); + assertNotNull(m); + System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString()); + } + + assertNull(consumers[i].receive(200)); + session.commit(); + + assertNull(consumers[i].receive(200)); + } + + q1.deleteQueue(); + q2.deleteQueue(); + } + + @Test + public void testTxRollbackReceive() throws Exception { + + Queue q1 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + Queue q2 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true); + + ClientSession session = sessionFactory.createSession(false, false); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(baseAddress); + + final int num = 10; + + for (int i = 0; i < num; i++) { + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + m.getBodyBuffer().writeString("AnyCast" + i); + producer.send(m); + } + assertNull(consumer1.receive(200)); + assertNull(consumer2.receive(200)); + session.commit(); + session.close(); + + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q1.getMessageCount())); + assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q2.getMessageCount())); + + ClientSession session1 = sessionFactory.createSession(false, false); + ClientSession session2 = sessionFactory.createSession(false, false); + session1.start(); + session2.start(); + + consumer1 = session1.createConsumer(q1.getName()); + consumer2 = session2.createConsumer(q2.getName()); + + ClientConsumer[] consumers = new ClientConsumer[]{consumer1, consumer2}; + ClientSession[] sessions = new ClientSession[]{session1, session2}; + Queue[] queues = new Queue[]{q1, q2}; + + for (int i = 0; i < consumers.length; i++) { + + for (int j = 0; j < num / 2; j++) { + ClientMessage m = consumers[i].receive(2000); + assertNotNull(m); + System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString()); + } + + assertNull(consumers[i].receive(200)); + sessions[i].rollback(); + sessions[i].close(); + + sessions[i] = sessionFactory.createSession(false, false); + sessions[i].start(); + + //receive same after rollback + consumers[i] = sessions[i].createConsumer(queues[i].getName()); + + for (int j = 0; j < num / 2; j++) { + ClientMessage m = consumers[i].receive(2000); + assertNotNull(m); + System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString()); + } + + assertNull(consumers[i].receive(200)); + sessions[i].commit(); + + assertNull(consumers[i].receive(200)); + sessions[i].close(); + } + + q1.deleteQueue(); + q2.deleteQueue(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java index a721acace6..ccbc4b260c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JmsNettyNioStressTest.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; @@ -36,6 +37,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -128,8 +130,10 @@ public class JmsNettyNioStressTest extends ActiveMQTestBase { // create the 2 queues used in the test ClientSessionFactory sf = locator.createSessionFactory(transpConf); ClientSession session = sf.createTransactedSession(); - session.createQueue("queue", "queue"); - session.createQueue("queue2", "queue2"); + session.createAddress(SimpleString.toSimpleString("queue"), RoutingType.ANYCAST, false); + session.createAddress(SimpleString.toSimpleString("queue2"), RoutingType.ANYCAST, false); + session.createQueue("queue", RoutingType.ANYCAST, "queue"); + session.createQueue("queue2", RoutingType.ANYCAST, "queue2"); session.commit(); sf.close(); session.close(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java index e550bef8b8..07385625fd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java @@ -26,6 +26,7 @@ import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.SpawnedVMSupport; @@ -39,7 +40,7 @@ public class PendingDeliveriesTest extends ClientTestBase { @Before public void createQueue() throws Exception { - server.createQueue(SimpleString.toSimpleString("queue1"), SimpleString.toSimpleString("queue1"), null, true, false); + server.createQueue(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST, SimpleString.toSimpleString("queue1"), null, true, false); } @After diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java index 6e960f2505..9fa9ac4c6c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java @@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.registry.JndiBindingRegistry; import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQSession; @@ -198,7 +199,7 @@ public class JMSFailoverTest extends ActiveMQTestBase { SimpleString jmsQueueName = new SimpleString("myqueue"); - coreSession.createQueue(jmsQueueName, jmsQueueName, null, true); + coreSession.createQueue(jmsQueueName, RoutingType.ANYCAST, jmsQueueName, null, true); Queue queue = sess.createQueue("myqueue"); @@ -271,7 +272,7 @@ public class JMSFailoverTest extends ActiveMQTestBase { SimpleString jmsQueueName = new SimpleString("myqueue"); - coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, true); + coreSessionLive.createQueue(jmsQueueName, RoutingType.ANYCAST, jmsQueueName, null, true); Queue queue = sessLive.createQueue("myqueue"); @@ -377,7 +378,7 @@ public class JMSFailoverTest extends ActiveMQTestBase { } }; - coreSession.createQueue(QUEUE, QUEUE, true); + coreSession.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, true); Queue queue = sess.createQueue("somequeue"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index c342853073..58d75d8a60 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -1096,8 +1096,8 @@ public class MQTTTest extends MQTTTestSupport { connection.start(); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Queue queue = s.createQueue(destinationName); - MessageProducer producer = s.createProducer(queue); + javax.jms.Topic topic = s.createTopic(destinationName); + MessageProducer producer = s.createProducer(topic); // send retained message from JMS final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; @@ -1626,10 +1626,7 @@ public class MQTTTest extends MQTTTestSupport { SimpleString coreAddress = new SimpleString("foo.bar"); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; - AddressInfo addressInfo = new AddressInfo(coreAddress); - getServer().createOrUpdateAddressInfo(addressInfo); - - getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, false); + getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, true); MQTT mqtt = createMQTTConnection(); mqtt.setClientId(clientId); @@ -1675,7 +1672,7 @@ public class MQTTTest extends MQTTTestSupport { try { String clientId = "testMqtt"; SimpleString coreAddress = new SimpleString("foo.bar"); - getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, false); + getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 4fccfa3206..a6bb55f80c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -49,6 +49,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -692,7 +693,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { Connection conn1 = null; SimpleString durableQueue = new SimpleString("exampleQueue"); - this.server.createQueue(durableQueue, durableQueue, null, true, false); + this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false); Queue queue = ActiveMQJMSClient.createQueue("exampleQueue"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java index a672bb1334..2e77bb9389 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java @@ -59,7 +59,7 @@ public class ResourceLimitTest extends ActiveMQTestBase { ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager(); securityManager.getConfiguration().addUser("myUser", "password"); securityManager.getConfiguration().addRole("myUser", "arole"); - Role role = new Role("arole", false, false, false, false, true, true, false, true, false, false); + Role role = new Role("arole", false, false, false, false, true, true, false, true, false, true); Set roles = new HashSet<>(); roles.add(role); server.getSecurityRepository().addMatch("#", roles); diff --git a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java index 2ada3bea7e..bcff21bf1f 100644 --- a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java +++ b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java @@ -149,7 +149,7 @@ public class AbstractAdmin implements Admin { @Override public void createTopic(final String name) { try { - invokeSyncOperation(ResourceNames.BROKER, "createAddress", name, new Object[]{"MULTICAST"}); + invokeSyncOperation(ResourceNames.BROKER, "createAddress", name, "MULTICAST"); } catch (Exception e) { throw new IllegalStateException(e); }