ARTEMIS-1378 Update Address is broken, may lose messages
This commit is contained in:
parent
9df2ffe248
commit
0df12657af
|
@ -283,6 +283,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
||||||
|
|
||||||
void addQueueBinding(long tx, Binding binding) throws Exception;
|
void addQueueBinding(long tx, Binding binding) throws Exception;
|
||||||
|
|
||||||
|
void updateQueueBinding(long tx, Binding binding) throws Exception;
|
||||||
|
|
||||||
void deleteQueueBinding(long tx, long queueBindingID) throws Exception;
|
void deleteQueueBinding(long tx, long queueBindingID) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1220,8 +1220,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
|
|
||||||
// BindingsImpl operations
|
// BindingsImpl operations
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateQueueBinding(long tx, Binding binding) throws Exception {
|
||||||
|
internalQueueBinding(true, tx, binding);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addQueueBinding(final long tx, final Binding binding) throws Exception {
|
public void addQueueBinding(final long tx, final Binding binding) throws Exception {
|
||||||
|
internalQueueBinding(false, tx, binding);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internalQueueBinding(boolean update, final long tx, final Binding binding) throws Exception {
|
||||||
Queue queue = (Queue) binding.getBindable();
|
Queue queue = (Queue) binding.getBindable();
|
||||||
|
|
||||||
Filter filter = queue.getFilter();
|
Filter filter = queue.getFilter();
|
||||||
|
@ -1232,7 +1241,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
|
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
bindingsJournal.appendAddRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding);
|
|
||||||
|
if (update) {
|
||||||
|
System.out.println("Update " + binding.getID());
|
||||||
|
bindingsJournal.appendUpdateRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding);
|
||||||
|
} else {
|
||||||
|
System.out.println("Adding " + binding.getID());
|
||||||
|
bindingsJournal.appendAddRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
readUnLock();
|
readUnLock();
|
||||||
}
|
}
|
||||||
|
@ -1402,7 +1418,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
|
|
||||||
if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) {
|
if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) {
|
||||||
PersistentQueueBindingEncoding bindingEncoding = newQueueBindingEncoding(id, buffer);
|
PersistentQueueBindingEncoding bindingEncoding = newQueueBindingEncoding(id, buffer);
|
||||||
queueBindingInfos.add(bindingEncoding);
|
|
||||||
mapBindings.put(bindingEncoding.getId(), bindingEncoding);
|
mapBindings.put(bindingEncoding.getId(), bindingEncoding);
|
||||||
} else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
|
} else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
|
||||||
idGenerator.loadState(record.id, buffer);
|
idGenerator.loadState(record.id, buffer);
|
||||||
|
@ -1434,6 +1449,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (PersistentQueueBindingEncoding queue : mapBindings.values()) {
|
||||||
|
queueBindingInfos.add(queue);
|
||||||
|
}
|
||||||
|
|
||||||
mapBindings.clear(); // just to give a hand to GC
|
mapBindings.clear(); // just to give a hand to GC
|
||||||
|
|
||||||
// This will instruct the IDGenerator to beforeStop old records
|
// This will instruct the IDGenerator to beforeStop old records
|
||||||
|
|
|
@ -91,6 +91,10 @@ public class NullStorageManager implements StorageManager {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateQueueBinding(long tx, Binding binding) throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteQueueStatus(long recordID) throws Exception {
|
public void deleteQueueStatus(long recordID) throws Exception {
|
||||||
|
|
||||||
|
|
|
@ -437,6 +437,24 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** used on update queue, to validate when a value needs update */
|
||||||
|
private static int replaceNull(Integer value) {
|
||||||
|
if (value == null) {
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
return value.intValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** used on update queue, to validate when a value needs update */
|
||||||
|
private static boolean replaceNull(Boolean value) {
|
||||||
|
if (value == null) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return value.booleanValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueueBinding updateQueue(SimpleString name,
|
public QueueBinding updateQueue(SimpleString name,
|
||||||
RoutingType routingType,
|
RoutingType routingType,
|
||||||
|
@ -447,8 +465,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
if (queueBinding == null) {
|
if (queueBinding == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Queue queue = queueBinding.getQueue();
|
final Queue queue = queueBinding.getQueue();
|
||||||
//TODO put the whole update logic on Queue
|
|
||||||
|
if (queue.getRoutingType() == routingType && replaceNull(maxConsumers) == replaceNull(queue.getMaxConsumers()) && queue.isPurgeOnNoConsumers() == replaceNull(purgeOnNoConsumers)) {
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.tracef("Queue " + name + " didn't need to be updated");
|
||||||
|
}
|
||||||
|
return queueBinding;
|
||||||
|
}
|
||||||
|
|
||||||
//validate update
|
//validate update
|
||||||
if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
|
if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
|
||||||
final int consumerCount = queue.getConsumerCount();
|
final int consumerCount = queue.getConsumerCount();
|
||||||
|
@ -464,6 +491,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
|
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//atomic update
|
//atomic update
|
||||||
if (maxConsumers != null) {
|
if (maxConsumers != null) {
|
||||||
queue.setMaxConsumer(maxConsumers);
|
queue.setMaxConsumer(maxConsumers);
|
||||||
|
@ -474,6 +502,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
if (purgeOnNoConsumers != null) {
|
if (purgeOnNoConsumers != null) {
|
||||||
queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
|
queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final long txID = storageManager.generateID();
|
||||||
|
try {
|
||||||
|
storageManager.updateQueueBinding(txID, queueBinding);
|
||||||
|
storageManager.commitBindings(txID);
|
||||||
|
} catch (Throwable throwable) {
|
||||||
|
storageManager.rollback(txID);
|
||||||
|
logger.warn(throwable.getMessage(), throwable);
|
||||||
|
throw throwable;
|
||||||
|
}
|
||||||
|
|
||||||
return queueBinding;
|
return queueBinding;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -518,7 +557,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
return queues;
|
return queues;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// TODO - needs to be synchronized to prevent happening concurrently with activate()
|
// TODO - needs to be synchronized to prevent happening concurrently with activate()
|
||||||
// (and possible removeBinding and other methods)
|
// (and possible removeBinding and other methods)
|
||||||
// Otherwise can have situation where createQueue comes in before failover, then failover occurs
|
// Otherwise can have situation where createQueue comes in before failover, then failover occurs
|
||||||
|
@ -662,15 +700,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RoutingStatus route(final Message message,
|
public RoutingStatus route(final Message message, final boolean direct) throws Exception {
|
||||||
final boolean direct) throws Exception {
|
|
||||||
return route(message, (Transaction) null, direct);
|
return route(message, (Transaction) null, direct);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RoutingStatus route(final Message message,
|
public RoutingStatus route(final Message message, final Transaction tx, final boolean direct) throws Exception {
|
||||||
final Transaction tx,
|
|
||||||
final boolean direct) throws Exception {
|
|
||||||
return route(message, new RoutingContextImpl(tx), direct);
|
return route(message, new RoutingContextImpl(tx), direct);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -721,11 +756,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
// first check for the auto-queue creation thing
|
// first check for the auto-queue creation thing
|
||||||
if (bindings == null) {
|
if (bindings == null) {
|
||||||
// There is no queue with this address, we will check if it needs to be created
|
// There is no queue with this address, we will check if it needs to be created
|
||||||
// if (queueCreator.create(address)) {
|
// if (queueCreator.create(address)) {
|
||||||
// TODO: this is not working!!!!
|
// TODO: this is not working!!!!
|
||||||
// reassign bindings if it was created
|
// reassign bindings if it was created
|
||||||
// bindings = addressManager.getBindingsForRoutingAddress(address);
|
// bindings = addressManager.getBindingsForRoutingAddress(address);
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bindings != null) {
|
if (bindings != null) {
|
||||||
|
@ -786,8 +821,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
|
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
|
||||||
processRoute(message, context, direct);
|
processRoute(message, context, direct);
|
||||||
final RoutingStatus finalResult = result;
|
final RoutingStatus finalResult = result;
|
||||||
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct,
|
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalResult) : null);
|
||||||
rejectDuplicates, finalResult) : null);
|
|
||||||
} catch (ActiveMQAddressFullException e) {
|
} catch (ActiveMQAddressFullException e) {
|
||||||
if (startedTX.get()) {
|
if (startedTX.get()) {
|
||||||
context.getTransaction().rollback();
|
context.getTransaction().rollback();
|
||||||
|
@ -818,9 +852,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessageReference reroute(final Message message,
|
public MessageReference reroute(final Message message, final Queue queue, final Transaction tx) throws Exception {
|
||||||
final Queue queue,
|
|
||||||
final Transaction tx) throws Exception {
|
|
||||||
|
|
||||||
setPagingStore(message);
|
setPagingStore(message);
|
||||||
|
|
||||||
|
@ -853,8 +885,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Pair<RoutingContext, Message> redistribute(final Message message,
|
public Pair<RoutingContext, Message> redistribute(final Message message,
|
||||||
final Queue originatingQueue,
|
final Queue originatingQueue,
|
||||||
final Transaction tx) throws Exception {
|
final Transaction tx) throws Exception {
|
||||||
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
|
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
|
||||||
// arrived the target node
|
// arrived the target node
|
||||||
// as described on https://issues.jboss.org/browse/JBPAPP-6130
|
// as described on https://issues.jboss.org/browse/JBPAPP-6130
|
||||||
|
@ -912,7 +944,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
|
public SimpleString getMatchingQueue(SimpleString address,
|
||||||
|
SimpleString queueName,
|
||||||
|
RoutingType routingType) throws Exception {
|
||||||
return addressManager.getMatchingQueue(address, queueName, routingType);
|
return addressManager.getMatchingQueue(address, queueName, routingType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1006,16 +1040,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
|
|
||||||
// Private -----------------------------------------------------------------
|
// Private -----------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
private void setPagingStore(final Message message) throws Exception {
|
private void setPagingStore(final Message message) throws Exception {
|
||||||
PagingStore store = pagingManager.getPageStore(message.getAddressSimpleString());
|
PagingStore store = pagingManager.getPageStore(message.getAddressSimpleString());
|
||||||
|
|
||||||
message.setContext(store);
|
message.setContext(store);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void routeQueueInfo(final Message message,
|
private void routeQueueInfo(final Message message, final Queue queue, final boolean applyFilters) throws Exception {
|
||||||
final Queue queue,
|
|
||||||
final boolean applyFilters) throws Exception {
|
|
||||||
if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message)) {
|
if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message)) {
|
||||||
RoutingContext context = new RoutingContextImpl(null);
|
RoutingContext context = new RoutingContextImpl(null);
|
||||||
|
|
||||||
|
@ -1098,7 +1129,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (deliveryTime != null) {
|
if (deliveryTime != null) {
|
||||||
reference.setScheduledDeliveryTime(deliveryTime);
|
reference.setScheduledDeliveryTime(deliveryTime);
|
||||||
}
|
}
|
||||||
|
@ -1127,7 +1157,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
|
storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (deliveryTime > 0) {
|
if (deliveryTime > 0) {
|
||||||
if (tx != null) {
|
if (tx != null) {
|
||||||
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
|
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
|
||||||
|
|
|
@ -2809,20 +2809,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
RoutingType routingType,
|
RoutingType routingType,
|
||||||
Integer maxConsumers,
|
Integer maxConsumers,
|
||||||
Boolean purgeOnNoConsumers) throws Exception {
|
Boolean purgeOnNoConsumers) throws Exception {
|
||||||
|
|
||||||
final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers);
|
final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers);
|
||||||
if (queueBinding != null) {
|
if (queueBinding != null) {
|
||||||
final Queue queue = queueBinding.getQueue();
|
final Queue queue = queueBinding.getQueue();
|
||||||
if (queue.isDurable()) {
|
|
||||||
final long txID = storageManager.generateID();
|
|
||||||
try {
|
|
||||||
storageManager.deleteQueueBinding(txID, queueBinding.getID());
|
|
||||||
storageManager.addQueueBinding(txID, queueBinding);
|
|
||||||
storageManager.commitBindings(txID);
|
|
||||||
} catch (Throwable throwable) {
|
|
||||||
storageManager.rollbackBindings(txID);
|
|
||||||
throw throwable;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return queue;
|
return queue;
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -229,6 +229,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateQueueBinding(long tx, Binding binding) throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
|
public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
|
||||||
|
|
||||||
|
|
|
@ -270,6 +270,11 @@ public class SendAckFailTest extends ActiveMQTestBase {
|
||||||
manager.stop();
|
manager.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateQueueBinding(long tx, Binding binding) throws Exception {
|
||||||
|
manager.updateQueueBinding(tx, binding);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isStarted() {
|
public boolean isStarted() {
|
||||||
return manager.isStarted();
|
return manager.isStarted();
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class UpdateQueueTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateQueue() throws Exception {
|
||||||
|
ActiveMQServer server = createServer(true, true);
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
|
||||||
|
SimpleString ADDRESS = SimpleString.toSimpleString("queue.0");
|
||||||
|
|
||||||
|
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null,
|
||||||
|
null, true, false);
|
||||||
|
|
||||||
|
Connection conn = factory.createConnection();
|
||||||
|
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer prod = session.createProducer(session.createQueue(ADDRESS.toString()));
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
prod.send(session.createTextMessage("message " + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
server.updateQueue(ADDRESS.toString(), RoutingType.ANYCAST, 1, false);
|
||||||
|
|
||||||
|
conn.close();
|
||||||
|
factory.close();
|
||||||
|
|
||||||
|
server.stop();
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
Queue queue = server.locateQueue(ADDRESS);
|
||||||
|
|
||||||
|
Assert.assertNotNull("queue not found", queue);
|
||||||
|
|
||||||
|
factory = new ActiveMQConnectionFactory();
|
||||||
|
|
||||||
|
conn = factory.createConnection();
|
||||||
|
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(session.createQueue(ADDRESS.toString()));
|
||||||
|
|
||||||
|
conn.start();
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
Assert.assertNotNull(consumer.receive(5000));
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertNull(consumer.receiveNoWait());
|
||||||
|
|
||||||
|
Assert.assertEquals(1, queue.getMaxConsumers());
|
||||||
|
|
||||||
|
conn.close();
|
||||||
|
|
||||||
|
server.stop();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue