ARTEMIS-789 Fix Failing Tests
This commit is contained in:
parent
61aec1ba74
commit
683ae68989
|
@ -535,7 +535,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||||
queueName,
|
queueName,
|
||||||
routingType,
|
routingType,
|
||||||
null,
|
null,
|
||||||
true,
|
false,
|
||||||
false,
|
false,
|
||||||
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
|
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
|
||||||
ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
|
ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
|
||||||
|
|
|
@ -36,6 +36,7 @@ import javax.jms.TopicPublisher;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
@ -421,7 +422,10 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
|
||||||
} else {
|
} else {
|
||||||
connection.addKnownDestination(address);
|
connection.addKnownDestination(address);
|
||||||
}
|
}
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQQueueExistsException e) {
|
||||||
|
// The queue was created by another client/admin between the query check and send create queue packet
|
||||||
|
}
|
||||||
|
catch (ActiveMQException e) {
|
||||||
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -301,15 +301,21 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
|
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
|
||||||
|
|
||||||
if (!response.isExists()) {
|
if (!response.isExists()) {
|
||||||
if (jbd.isQueue() && response.isAutoCreateJmsQueues()) {
|
try {
|
||||||
// perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers)
|
if (jbd.isQueue() && response.isAutoCreateJmsQueues()) {
|
||||||
session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true);
|
// perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers)
|
||||||
session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true);
|
session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true);
|
||||||
} else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) {
|
session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true);
|
||||||
session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true);
|
} else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) {
|
||||||
} else {
|
session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true);
|
||||||
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
|
} else {
|
||||||
|
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
catch (ActiveMQQueueExistsException e) {
|
||||||
|
// Queue was created between our query and create queue request. Ignore.
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -647,7 +653,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
*/
|
*/
|
||||||
if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
|
if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
|
||||||
if (response.isAutoCreateJmsQueues()) {
|
if (response.isAutoCreateJmsQueues()) {
|
||||||
session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true);
|
try {
|
||||||
|
session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true);
|
||||||
|
}
|
||||||
|
catch (ActiveMQQueueExistsException e) {
|
||||||
|
// The queue was created by another client/admin between the query check and send create queue packet
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
|
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class MQTTPublishManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createManagementAddress() {
|
private void createManagementAddress() {
|
||||||
managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId());
|
managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createManagementQueue() throws Exception {
|
private void createManagementQueue() throws Exception {
|
||||||
|
@ -113,10 +113,13 @@ public class MQTTPublishManager {
|
||||||
if (qos == 0) {
|
if (qos == 0) {
|
||||||
sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
|
sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
|
||||||
session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
|
session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
|
||||||
} else {
|
} else if (qos == 1 || qos == 2) {
|
||||||
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
|
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
|
||||||
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
|
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
|
||||||
sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
|
sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
|
||||||
|
} else {
|
||||||
|
// Client must have disconnected and it's Subscription QoS cleared
|
||||||
|
consumer.individualCancel(message.getMessageID(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -231,7 +234,14 @@ public class MQTTPublishManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private int decideQoS(ServerMessage message, ServerConsumer consumer) {
|
private int decideQoS(ServerMessage message, ServerConsumer consumer) {
|
||||||
int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
|
|
||||||
|
int subscriptionQoS = -1;
|
||||||
|
try {
|
||||||
|
subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
|
||||||
|
} catch (NullPointerException e) {
|
||||||
|
// This can happen if the client disconnected during a server send.
|
||||||
|
return subscriptionQoS;
|
||||||
|
}
|
||||||
|
|
||||||
int qos = 2;
|
int qos = 2;
|
||||||
if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) {
|
if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) {
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class MQTTSubscriptionManager {
|
||||||
|
|
||||||
Queue q = session.getServer().locateQueue(queue);
|
Queue q = session.getServer().locateQueue(queue);
|
||||||
if (q == null) {
|
if (q == null) {
|
||||||
q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, true);
|
q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false);
|
||||||
} else {
|
} else {
|
||||||
if (q.isDeleteOnNoConsumers()) {
|
if (q.isDeleteOnNoConsumers()) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true);
|
throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true);
|
||||||
|
|
|
@ -48,6 +48,10 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
|
||||||
for (RoutingType routingType : routingTypes) {
|
for (RoutingType routingType : routingTypes) {
|
||||||
sb.append(routingType.toString() + ",");
|
sb.append(routingType.toString() + ",");
|
||||||
}
|
}
|
||||||
|
if (sb.charAt(sb.length() - 1) == ',') {
|
||||||
|
sb.deleteCharAt(sb.length() - 1);
|
||||||
|
}
|
||||||
|
sb.append("}");
|
||||||
sb.append(", autoCreated=" + autoCreated + "]");
|
sb.append(", autoCreated=" + autoCreated + "]");
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
|
@ -2314,16 +2314,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
recoverStoredConfigs();
|
recoverStoredConfigs();
|
||||||
|
|
||||||
|
Map<Long, AddressBindingInfo> addressBindingInfosMap = new HashMap<>();
|
||||||
|
|
||||||
|
journalLoader.initAddresses(addressBindingInfosMap, addressBindingInfos);
|
||||||
|
|
||||||
Map<Long, QueueBindingInfo> queueBindingInfosMap = new HashMap<>();
|
Map<Long, QueueBindingInfo> queueBindingInfosMap = new HashMap<>();
|
||||||
|
|
||||||
journalLoader.initQueues(queueBindingInfosMap, queueBindingInfos);
|
journalLoader.initQueues(queueBindingInfosMap, queueBindingInfos);
|
||||||
|
|
||||||
journalLoader.handleGroupingBindings(groupingInfos);
|
journalLoader.handleGroupingBindings(groupingInfos);
|
||||||
|
|
||||||
Map<Long, AddressBindingInfo> addressBindingInfosMap = new HashMap<>();
|
|
||||||
|
|
||||||
journalLoader.initAddresses(addressBindingInfosMap, addressBindingInfos);
|
|
||||||
|
|
||||||
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>();
|
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>();
|
||||||
|
|
||||||
HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<>();
|
HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<>();
|
||||||
|
@ -2473,10 +2473,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
|
defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
|
||||||
AddressInfo info = postOffice.getAddressInfo(addressName);
|
AddressInfo info = postOffice.getAddressInfo(addressName);
|
||||||
|
|
||||||
|
boolean addressAlreadyExists = true;
|
||||||
|
|
||||||
if (info == null) {
|
if (info == null) {
|
||||||
if (autoCreateAddress) {
|
if (autoCreateAddress) {
|
||||||
postOffice.addAddressInfo(defaultAddressInfo.setAutoCreated(true));
|
createAddressInfo(defaultAddressInfo.setAutoCreated(true));
|
||||||
info = postOffice.getAddressInfo(addressName);
|
addressAlreadyExists = false;
|
||||||
} else {
|
} else {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
|
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
|
||||||
}
|
}
|
||||||
|
@ -2486,12 +2488,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
final Queue queue = queueFactory.createQueueWith(queueConfig);
|
final Queue queue = queueFactory.createQueueWith(queueConfig);
|
||||||
|
|
||||||
boolean addressAlreadyExists = true;
|
|
||||||
|
|
||||||
AddressInfo addressInfo = postOffice.getAddressInfo(queue.getAddress());
|
AddressInfo addressInfo = postOffice.getAddressInfo(queue.getAddress());
|
||||||
if (addressInfo == null) {
|
if (addressInfo == null) {
|
||||||
postOffice.addAddressInfo(new AddressInfo(queue.getAddress()));
|
createAddressInfo(new AddressInfo(queue.getAddress()));
|
||||||
addressAlreadyExists = false;
|
|
||||||
} else {
|
} else {
|
||||||
if (!addressInfo.getRoutingTypes().contains(routingType)) {
|
if (!addressInfo.getRoutingTypes().contains(routingType)) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, addressInfo.getName().toString(), addressInfo.getRoutingTypes());
|
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, addressInfo.getName().toString(), addressInfo.getRoutingTypes());
|
||||||
|
|
|
@ -52,6 +52,6 @@ public class AddressConfigTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
Set<RoutingType> routingTypeSet = new HashSet<>();
|
Set<RoutingType> routingTypeSet = new HashSet<>();
|
||||||
routingTypeSet.add(RoutingType.MULTICAST);
|
routingTypeSet.add(RoutingType.MULTICAST);
|
||||||
assertEquals(RoutingType.MULTICAST, addressInfo.getRoutingTypes());
|
assertEquals(routingTypeSet, addressInfo.getRoutingTypes());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,188 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.addressing;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.utils.TimeUtils;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class AnycastTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
private SimpleString baseAddress = new SimpleString("anycast.address");
|
||||||
|
|
||||||
|
private AddressInfo addressInfo;
|
||||||
|
|
||||||
|
private ActiveMQServer server;
|
||||||
|
|
||||||
|
private ClientSessionFactory sessionFactory;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
server = createServer(true);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
server.waitForActivation(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
|
||||||
|
sessionFactory = sl.createSessionFactory();
|
||||||
|
|
||||||
|
addSessionFactory(sessionFactory);
|
||||||
|
|
||||||
|
addressInfo = new AddressInfo(baseAddress);
|
||||||
|
addressInfo.addRoutingType(RoutingType.ANYCAST);
|
||||||
|
server.createOrUpdateAddressInfo(addressInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTxCommitReceive() throws Exception {
|
||||||
|
|
||||||
|
Queue q1 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
|
||||||
|
Queue q2 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
|
||||||
|
|
||||||
|
ClientSession session = sessionFactory.createSession(false, false);
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
ClientConsumer consumer1 = session.createConsumer(q1.getName());
|
||||||
|
ClientConsumer consumer2 = session.createConsumer(q2.getName());
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(baseAddress);
|
||||||
|
|
||||||
|
final int num = 10;
|
||||||
|
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
|
ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
|
||||||
|
m.getBodyBuffer().writeString("AnyCast" + i);
|
||||||
|
producer.send(m);
|
||||||
|
}
|
||||||
|
assertNull(consumer1.receive(200));
|
||||||
|
assertNull(consumer2.receive(200));
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q1.getMessageCount()));
|
||||||
|
assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q2.getMessageCount()));
|
||||||
|
|
||||||
|
ClientConsumer[] consumers = new ClientConsumer[]{consumer1, consumer2};
|
||||||
|
for (int i = 0; i < consumers.length; i++) {
|
||||||
|
|
||||||
|
for (int j = 0; j < num / 2; j++) {
|
||||||
|
ClientMessage m = consumers[i].receive(2000);
|
||||||
|
assertNotNull(m);
|
||||||
|
System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNull(consumers[i].receive(200));
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
assertNull(consumers[i].receive(200));
|
||||||
|
}
|
||||||
|
|
||||||
|
q1.deleteQueue();
|
||||||
|
q2.deleteQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTxRollbackReceive() throws Exception {
|
||||||
|
|
||||||
|
Queue q1 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
|
||||||
|
Queue q2 = server.createQueue(baseAddress, RoutingType.ANYCAST, baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
|
||||||
|
|
||||||
|
ClientSession session = sessionFactory.createSession(false, false);
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
ClientConsumer consumer1 = session.createConsumer(q1.getName());
|
||||||
|
ClientConsumer consumer2 = session.createConsumer(q2.getName());
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(baseAddress);
|
||||||
|
|
||||||
|
final int num = 10;
|
||||||
|
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
|
ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
|
||||||
|
m.getBodyBuffer().writeString("AnyCast" + i);
|
||||||
|
producer.send(m);
|
||||||
|
}
|
||||||
|
assertNull(consumer1.receive(200));
|
||||||
|
assertNull(consumer2.receive(200));
|
||||||
|
session.commit();
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q1.getMessageCount()));
|
||||||
|
assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num / 2 == q2.getMessageCount()));
|
||||||
|
|
||||||
|
ClientSession session1 = sessionFactory.createSession(false, false);
|
||||||
|
ClientSession session2 = sessionFactory.createSession(false, false);
|
||||||
|
session1.start();
|
||||||
|
session2.start();
|
||||||
|
|
||||||
|
consumer1 = session1.createConsumer(q1.getName());
|
||||||
|
consumer2 = session2.createConsumer(q2.getName());
|
||||||
|
|
||||||
|
ClientConsumer[] consumers = new ClientConsumer[]{consumer1, consumer2};
|
||||||
|
ClientSession[] sessions = new ClientSession[]{session1, session2};
|
||||||
|
Queue[] queues = new Queue[]{q1, q2};
|
||||||
|
|
||||||
|
for (int i = 0; i < consumers.length; i++) {
|
||||||
|
|
||||||
|
for (int j = 0; j < num / 2; j++) {
|
||||||
|
ClientMessage m = consumers[i].receive(2000);
|
||||||
|
assertNotNull(m);
|
||||||
|
System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNull(consumers[i].receive(200));
|
||||||
|
sessions[i].rollback();
|
||||||
|
sessions[i].close();
|
||||||
|
|
||||||
|
sessions[i] = sessionFactory.createSession(false, false);
|
||||||
|
sessions[i].start();
|
||||||
|
|
||||||
|
//receive same after rollback
|
||||||
|
consumers[i] = sessions[i].createConsumer(queues[i].getName());
|
||||||
|
|
||||||
|
for (int j = 0; j < num / 2; j++) {
|
||||||
|
ClientMessage m = consumers[i].receive(2000);
|
||||||
|
assertNotNull(m);
|
||||||
|
System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNull(consumers[i].receive(200));
|
||||||
|
sessions[i].commit();
|
||||||
|
|
||||||
|
assertNull(consumers[i].receive(200));
|
||||||
|
sessions[i].close();
|
||||||
|
}
|
||||||
|
|
||||||
|
q1.deleteQueue();
|
||||||
|
q2.deleteQueue();
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,6 +26,7 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
@ -36,6 +37,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
@ -128,8 +130,10 @@ public class JmsNettyNioStressTest extends ActiveMQTestBase {
|
||||||
// create the 2 queues used in the test
|
// create the 2 queues used in the test
|
||||||
ClientSessionFactory sf = locator.createSessionFactory(transpConf);
|
ClientSessionFactory sf = locator.createSessionFactory(transpConf);
|
||||||
ClientSession session = sf.createTransactedSession();
|
ClientSession session = sf.createTransactedSession();
|
||||||
session.createQueue("queue", "queue");
|
session.createAddress(SimpleString.toSimpleString("queue"), RoutingType.ANYCAST, false);
|
||||||
session.createQueue("queue2", "queue2");
|
session.createAddress(SimpleString.toSimpleString("queue2"), RoutingType.ANYCAST, false);
|
||||||
|
session.createQueue("queue", RoutingType.ANYCAST, "queue");
|
||||||
|
session.createQueue("queue2", RoutingType.ANYCAST, "queue2");
|
||||||
session.commit();
|
session.commit();
|
||||||
sf.close();
|
sf.close();
|
||||||
session.close();
|
session.close();
|
||||||
|
|
|
@ -26,6 +26,7 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
|
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
|
||||||
|
@ -39,7 +40,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void createQueue() throws Exception {
|
public void createQueue() throws Exception {
|
||||||
server.createQueue(SimpleString.toSimpleString("queue1"), SimpleString.toSimpleString("queue1"), null, true, false);
|
server.createQueue(SimpleString.toSimpleString("queue1"), RoutingType.ANYCAST, SimpleString.toSimpleString("queue1"), null, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
|
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
|
||||||
|
@ -198,7 +199,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
SimpleString jmsQueueName = new SimpleString("myqueue");
|
SimpleString jmsQueueName = new SimpleString("myqueue");
|
||||||
|
|
||||||
coreSession.createQueue(jmsQueueName, jmsQueueName, null, true);
|
coreSession.createQueue(jmsQueueName, RoutingType.ANYCAST, jmsQueueName, null, true);
|
||||||
|
|
||||||
Queue queue = sess.createQueue("myqueue");
|
Queue queue = sess.createQueue("myqueue");
|
||||||
|
|
||||||
|
@ -271,7 +272,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
SimpleString jmsQueueName = new SimpleString("myqueue");
|
SimpleString jmsQueueName = new SimpleString("myqueue");
|
||||||
|
|
||||||
coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, true);
|
coreSessionLive.createQueue(jmsQueueName, RoutingType.ANYCAST, jmsQueueName, null, true);
|
||||||
|
|
||||||
Queue queue = sessLive.createQueue("myqueue");
|
Queue queue = sessLive.createQueue("myqueue");
|
||||||
|
|
||||||
|
@ -377,7 +378,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
coreSession.createQueue(QUEUE, QUEUE, true);
|
coreSession.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, true);
|
||||||
|
|
||||||
Queue queue = sess.createQueue("somequeue");
|
Queue queue = sess.createQueue("somequeue");
|
||||||
|
|
||||||
|
|
|
@ -1096,8 +1096,8 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
javax.jms.Queue queue = s.createQueue(destinationName);
|
javax.jms.Topic topic = s.createTopic(destinationName);
|
||||||
MessageProducer producer = s.createProducer(queue);
|
MessageProducer producer = s.createProducer(topic);
|
||||||
|
|
||||||
// send retained message from JMS
|
// send retained message from JMS
|
||||||
final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
|
final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
|
||||||
|
@ -1626,10 +1626,7 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
SimpleString coreAddress = new SimpleString("foo.bar");
|
SimpleString coreAddress = new SimpleString("foo.bar");
|
||||||
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
|
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
|
||||||
|
|
||||||
AddressInfo addressInfo = new AddressInfo(coreAddress);
|
getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, true);
|
||||||
getServer().createOrUpdateAddressInfo(addressInfo);
|
|
||||||
|
|
||||||
getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, false);
|
|
||||||
|
|
||||||
MQTT mqtt = createMQTTConnection();
|
MQTT mqtt = createMQTTConnection();
|
||||||
mqtt.setClientId(clientId);
|
mqtt.setClientId(clientId);
|
||||||
|
@ -1675,7 +1672,7 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
try {
|
try {
|
||||||
String clientId = "testMqtt";
|
String clientId = "testMqtt";
|
||||||
SimpleString coreAddress = new SimpleString("foo.bar");
|
SimpleString coreAddress = new SimpleString("foo.bar");
|
||||||
getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, false);
|
getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true);
|
||||||
|
|
||||||
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
|
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
@ -692,7 +693,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
||||||
Connection conn1 = null;
|
Connection conn1 = null;
|
||||||
|
|
||||||
SimpleString durableQueue = new SimpleString("exampleQueue");
|
SimpleString durableQueue = new SimpleString("exampleQueue");
|
||||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true, false);
|
||||||
|
|
||||||
Queue queue = ActiveMQJMSClient.createQueue("exampleQueue");
|
Queue queue = ActiveMQJMSClient.createQueue("exampleQueue");
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class ResourceLimitTest extends ActiveMQTestBase {
|
||||||
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
|
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
|
||||||
securityManager.getConfiguration().addUser("myUser", "password");
|
securityManager.getConfiguration().addUser("myUser", "password");
|
||||||
securityManager.getConfiguration().addRole("myUser", "arole");
|
securityManager.getConfiguration().addRole("myUser", "arole");
|
||||||
Role role = new Role("arole", false, false, false, false, true, true, false, true, false, false);
|
Role role = new Role("arole", false, false, false, false, true, true, false, true, false, true);
|
||||||
Set<Role> roles = new HashSet<>();
|
Set<Role> roles = new HashSet<>();
|
||||||
roles.add(role);
|
roles.add(role);
|
||||||
server.getSecurityRepository().addMatch("#", roles);
|
server.getSecurityRepository().addMatch("#", roles);
|
||||||
|
|
|
@ -149,7 +149,7 @@ public class AbstractAdmin implements Admin {
|
||||||
@Override
|
@Override
|
||||||
public void createTopic(final String name) {
|
public void createTopic(final String name) {
|
||||||
try {
|
try {
|
||||||
invokeSyncOperation(ResourceNames.BROKER, "createAddress", name, new Object[]{"MULTICAST"});
|
invokeSyncOperation(ResourceNames.BROKER, "createAddress", name, "MULTICAST");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IllegalStateException(e);
|
throw new IllegalStateException(e);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue