diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 369dba6775..ba32252990 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -283,6 +283,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void addQueueBinding(long tx, Binding binding) throws Exception; + void updateQueueBinding(long tx, Binding binding) throws Exception; + void deleteQueueBinding(long tx, long queueBindingID) throws Exception; /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 58c86a0a59..d3db9e5468 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1220,8 +1220,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp // BindingsImpl operations + @Override + public void updateQueueBinding(long tx, Binding binding) throws Exception { + internalQueueBinding(true, tx, binding); + } + @Override public void addQueueBinding(final long tx, final Binding binding) throws Exception { + internalQueueBinding(false, tx, binding); + } + + private void internalQueueBinding(boolean update, final long tx, final Binding binding) throws Exception { Queue queue = (Queue) binding.getBindable(); Filter filter = queue.getFilter(); @@ -1232,7 +1241,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp readLock(); try { - bindingsJournal.appendAddRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding); + + if (update) { + System.out.println("Update " + binding.getID()); + bindingsJournal.appendUpdateRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding); + } else { + System.out.println("Adding " + binding.getID()); + bindingsJournal.appendAddRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding); + } } finally { readUnLock(); } @@ -1402,7 +1418,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) { PersistentQueueBindingEncoding bindingEncoding = newQueueBindingEncoding(id, buffer); - queueBindingInfos.add(bindingEncoding); mapBindings.put(bindingEncoding.getId(), bindingEncoding); } else if (rec == JournalRecordIds.ID_COUNTER_RECORD) { idGenerator.loadState(record.id, buffer); @@ -1434,6 +1449,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } } + for (PersistentQueueBindingEncoding queue : mapBindings.values()) { + queueBindingInfos.add(queue); + } + mapBindings.clear(); // just to give a hand to GC // This will instruct the IDGenerator to beforeStop old records diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 2c297d905c..32f901082f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -91,6 +91,10 @@ public class NullStorageManager implements StorageManager { return 0; } + @Override + public void updateQueueBinding(long tx, Binding binding) throws Exception { + } + @Override public void deleteQueueStatus(long recordID) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index a64394a204..3e5ede5b91 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -437,6 +437,24 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + /** used on update queue, to validate when a value needs update */ + private static int replaceNull(Integer value) { + if (value == null) { + return -1; + } else { + return value.intValue(); + } + } + + /** used on update queue, to validate when a value needs update */ + private static boolean replaceNull(Boolean value) { + if (value == null) { + return false; + } else { + return value.booleanValue(); + } + } + @Override public QueueBinding updateQueue(SimpleString name, RoutingType routingType, @@ -447,8 +465,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (queueBinding == null) { return null; } + final Queue queue = queueBinding.getQueue(); - //TODO put the whole update logic on Queue + + if (queue.getRoutingType() == routingType && replaceNull(maxConsumers) == replaceNull(queue.getMaxConsumers()) && queue.isPurgeOnNoConsumers() == replaceNull(purgeOnNoConsumers)) { + + if (logger.isTraceEnabled()) { + logger.tracef("Queue " + name + " didn't need to be updated"); + } + return queueBinding; + } + //validate update if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) { final int consumerCount = queue.getConsumerCount(); @@ -464,6 +491,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes); } } + //atomic update if (maxConsumers != null) { queue.setMaxConsumer(maxConsumers); @@ -474,6 +502,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (purgeOnNoConsumers != null) { queue.setPurgeOnNoConsumers(purgeOnNoConsumers); } + + final long txID = storageManager.generateID(); + try { + storageManager.updateQueueBinding(txID, queueBinding); + storageManager.commitBindings(txID); + } catch (Throwable throwable) { + storageManager.rollback(txID); + logger.warn(throwable.getMessage(), throwable); + throw throwable; + } + return queueBinding; } } @@ -518,7 +557,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return queues; } - // TODO - needs to be synchronized to prevent happening concurrently with activate() // (and possible removeBinding and other methods) // Otherwise can have situation where createQueue comes in before failover, then failover occurs @@ -662,15 +700,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public RoutingStatus route(final Message message, - final boolean direct) throws Exception { + public RoutingStatus route(final Message message, final boolean direct) throws Exception { return route(message, (Transaction) null, direct); } @Override - public RoutingStatus route(final Message message, - final Transaction tx, - final boolean direct) throws Exception { + public RoutingStatus route(final Message message, final Transaction tx, final boolean direct) throws Exception { return route(message, new RoutingContextImpl(tx), direct); } @@ -721,11 +756,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // first check for the auto-queue creation thing if (bindings == null) { // There is no queue with this address, we will check if it needs to be created -// if (queueCreator.create(address)) { - // TODO: this is not working!!!! - // reassign bindings if it was created -// bindings = addressManager.getBindingsForRoutingAddress(address); -// } + // if (queueCreator.create(address)) { + // TODO: this is not working!!!! + // reassign bindings if it was created + // bindings = addressManager.getBindingsForRoutingAddress(address); + // } } if (bindings != null) { @@ -786,8 +821,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null); processRoute(message, context, direct); final RoutingStatus finalResult = result; - server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, - rejectDuplicates, finalResult) : null); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalResult) : null); } catch (ActiveMQAddressFullException e) { if (startedTX.get()) { context.getTransaction().rollback(); @@ -818,9 +852,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public MessageReference reroute(final Message message, - final Queue queue, - final Transaction tx) throws Exception { + public MessageReference reroute(final Message message, final Queue queue, final Transaction tx) throws Exception { setPagingStore(message); @@ -853,8 +885,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding */ @Override public Pair redistribute(final Message message, - final Queue originatingQueue, - final Transaction tx) throws Exception { + final Queue originatingQueue, + final Transaction tx) throws Exception { // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message // arrived the target node // as described on https://issues.jboss.org/browse/JBPAPP-6130 @@ -912,7 +944,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception { + public SimpleString getMatchingQueue(SimpleString address, + SimpleString queueName, + RoutingType routingType) throws Exception { return addressManager.getMatchingQueue(address, queueName, routingType); } @@ -1006,16 +1040,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // Private ----------------------------------------------------------------- - private void setPagingStore(final Message message) throws Exception { PagingStore store = pagingManager.getPageStore(message.getAddressSimpleString()); message.setContext(store); } - private void routeQueueInfo(final Message message, - final Queue queue, - final boolean applyFilters) throws Exception { + private void routeQueueInfo(final Message message, final Queue queue, final boolean applyFilters) throws Exception { if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message)) { RoutingContext context = new RoutingContextImpl(null); @@ -1098,7 +1129,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - if (deliveryTime != null) { reference.setScheduledDeliveryTime(deliveryTime); } @@ -1127,7 +1157,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext()); } - if (deliveryTime > 0) { if (tx != null) { storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 4093e217a1..13f69653ff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2809,20 +2809,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { RoutingType routingType, Integer maxConsumers, Boolean purgeOnNoConsumers) throws Exception { + final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers); if (queueBinding != null) { final Queue queue = queueBinding.getQueue(); - if (queue.isDurable()) { - final long txID = storageManager.generateID(); - try { - storageManager.deleteQueueBinding(txID, queueBinding.getID()); - storageManager.addQueueBinding(txID, queueBinding); - storageManager.commitBindings(txID); - } catch (Throwable throwable) { - storageManager.rollbackBindings(txID); - throw throwable; - } - } return queue; } else { return null; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index b1ea206d3c..b256eb9127 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -229,6 +229,11 @@ public class TransactionImplTest extends ActiveMQTestBase { } + @Override + public void updateQueueBinding(long tx, Binding binding) throws Exception { + + } + @Override public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index a09843635d..d37d1346d3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -270,6 +270,11 @@ public class SendAckFailTest extends ActiveMQTestBase { manager.stop(); } + @Override + public void updateQueueBinding(long tx, Binding binding) throws Exception { + manager.updateQueueBinding(tx, binding); + } + @Override public boolean isStarted() { return manager.isStarted(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java new file mode 100644 index 0000000000..542fe33f0f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class UpdateQueueTest extends ActiveMQTestBase { + + @Test + public void testUpdateQueue() throws Exception { + ActiveMQServer server = createServer(true, true); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + + server.start(); + + + SimpleString ADDRESS = SimpleString.toSimpleString("queue.0"); + + server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, + null, true, false); + + Connection conn = factory.createConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod = session.createProducer(session.createQueue(ADDRESS.toString())); + + for (int i = 0; i < 100; i++) { + prod.send(session.createTextMessage("message " + i)); + } + + server.updateQueue(ADDRESS.toString(), RoutingType.ANYCAST, 1, false); + + conn.close(); + factory.close(); + + server.stop(); + + server.start(); + + Queue queue = server.locateQueue(ADDRESS); + + Assert.assertNotNull("queue not found", queue); + + factory = new ActiveMQConnectionFactory(); + + conn = factory.createConnection(); + session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(session.createQueue(ADDRESS.toString())); + + conn.start(); + for (int i = 0; i < 100; i++) { + Assert.assertNotNull(consumer.receive(5000)); + } + + Assert.assertNull(consumer.receiveNoWait()); + + Assert.assertEquals(1, queue.getMaxConsumers()); + + conn.close(); + + server.stop(); + + } +}