From a074f9f1a5f0541a83dd2500427022eb6e63c8f9 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 21 Oct 2016 14:59:52 -0400 Subject: [PATCH] ARTEMIS-753 Queue Pause and Resumed persisted --- .../api/core/management/QueueControl.java | 8 +- .../api/jms/management/JMSQueueControl.java | 6 ++ .../management/impl/JMSQueueControlImpl.java | 5 ++ .../management/impl/QueueControlImpl.java | 12 +++ .../core/persistence/QueueBindingInfo.java | 7 ++ .../artemis/core/persistence/QueueStatus.java | 46 ++++++++++++ .../core/persistence/StorageManager.java | 11 +++ .../AbstractJournalStorageManager.java | 66 +++++++++++++++- .../impl/journal/DescribeJournal.java | 4 + .../impl/journal/JournalRecordIds.java | 2 + .../codec/PersistentQueueBindingEncoding.java | 18 +++++ .../impl/journal/codec/QueueEncoding.java | 3 +- .../journal/codec/QueueStatusEncoding.java | 75 +++++++++++++++++++ .../impl/nullpm/NullStorageManager.java | 11 +++ .../activemq/artemis/core/server/Queue.java | 15 ++++ .../server/impl/PostOfficeJournalLoader.java | 13 +++- .../artemis/core/server/impl/QueueImpl.java | 41 ++++++++++ .../impl/ScheduledDeliveryHandlerTest.java | 13 ++++ .../transaction/impl/TransactionImplTest.java | 11 +++ .../JMSQueueControlUsingJMSTest.java | 5 ++ .../management/QueueControlUsingCoreTest.java | 5 ++ .../server/QueuePeristPauseTest.java | 60 +++++++++++++++ .../unit/core/postoffice/impl/FakeQueue.java | 15 ++++ 23 files changed, 448 insertions(+), 4 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 0a74d1c4a4..3336aaedc0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -410,10 +410,16 @@ public interface QueueControl { @Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION) void pause() throws Exception; + /** + * Pauses the queue. Messages are no longer delivered to its consumers. + */ + @Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION) + void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception; + /** * Resumes the queue. Messages are again delivered to its consumers. */ - @Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state.", impact = MBeanOperationInfo.ACTION) + @Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state. It will also affected the state of a persisted pause.", impact = MBeanOperationInfo.ACTION) void resume() throws Exception; @Operation(desc = "List all the existent consumers on the Queue") diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java index 56a127cdcb..3a4101a6b9 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java @@ -369,6 +369,12 @@ public interface JMSQueueControl extends DestinationControl { @Operation(desc = "Pause the queue.", impact = MBeanOperationInfo.ACTION) void pause() throws Exception; + /** + * Pauses the queue. Messages are no longer delivered to its consumers. + */ + @Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION) + void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception; + /** * Returns whether the queue is paused. */ diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java index a83614661d..36cba961a7 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java @@ -473,6 +473,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro coreQueueControl.pause(); } + @Override + public void pause(boolean persist) throws Exception { + coreQueueControl.pause(persist); + } + @Override public void resume() throws Exception { coreQueueControl.resume(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 7275ea4fb0..cfa8aa5746 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -848,6 +848,18 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + + @Override + public void pause(boolean persist) { + checkStarted(); + + clearIO(); + try { + queue.pause(persist); + } finally { + blockOnIO(); + } + } @Override public void resume() { checkStarted(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java index c05b86c77f..8c80a8a520 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.artemis.core.persistence; +import java.util.List; + import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding; public interface QueueBindingInfo { @@ -39,4 +42,8 @@ public interface QueueBindingInfo { SimpleString getUser(); + void addQueueStatusEncoding(QueueStatusEncoding status); + + List getQueueStatusEncodings(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java new file mode 100644 index 0000000000..18fa9b9d17 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.persistence; + +public enum QueueStatus { + PAUSED((short) 0), RUNNING((short) 1); + + public final short id; + + QueueStatus(short id) { + this.id = id; + } + + public static QueueStatus[] values; + + static { + QueueStatus[] allValues = QueueStatus.values(); + values = new QueueStatus[allValues.length]; + for (QueueStatus v : allValues) { + values[v.id] = v; + } + } + + public static QueueStatus fromID(short id) { + if (id < 0 || id > values.length) { + return null; + } else { + return values[id]; + } + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index e82066416a..bbfec148c1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -287,6 +287,17 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void deleteQueueBinding(long tx, long queueBindingID) throws Exception; + /** + * + * @param queueID The id of the queue + * @param status The current status of the queue. (Reserved for future use, ATM we only use this record for PAUSED) + * @return the id of the journal + * @throws Exception + */ + long storeQueueStatus(long queueID, QueueStatus status) throws Exception; + + void deleteQueueStatus(long recordID) throws Exception; + JournalLoadInformation loadBindingJournal(List queueBindingInfos, List groupingInfos) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index cd86191aa6..a6938d6767 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; +import org.apache.activemq.artemis.core.persistence.QueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; @@ -81,6 +82,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCount import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.XidEncoding; @@ -105,6 +107,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.IDGenerator; +import org.jboss.logging.Logger; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; @@ -122,6 +125,8 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR */ public abstract class AbstractJournalStorageManager implements StorageManager { + private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class); + public enum JournalContent { BINDINGS((byte) 0), MESSAGES((byte) 1); @@ -1236,6 +1241,32 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } } + @Override + public long storeQueueStatus(long queueID, QueueStatus status) throws Exception { + long recordID = idGenerator.generateID(); + + readLock(); + try { + bindingsJournal.appendAddRecord(recordID, JournalRecordIds.QUEUE_STATUS_RECORD, new QueueStatusEncoding(queueID, status), true); + } finally { + readUnLock(); + } + + + return recordID; + } + + @Override + public void deleteQueueStatus(long recordID) throws Exception { + readLock(); + try { + bindingsJournal.appendDeleteRecord(recordID, true); + } finally { + readUnLock(); + } + + } + @Override public long storePageCounterInc(long txID, long queueID, int value) throws Exception { readLock(); @@ -1326,6 +1357,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager { JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null); + HashMap mapBindings = new HashMap<>(); + for (RecordInfo record : records) { long id = record.id; @@ -1337,6 +1370,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { PersistentQueueBindingEncoding bindingEncoding = newBindingEncoding(id, buffer); queueBindingInfos.add(bindingEncoding); + mapBindings.put(bindingEncoding.getId(), bindingEncoding); } else if (rec == JournalRecordIds.ID_COUNTER_RECORD) { idGenerator.loadState(record.id, buffer); } else if (rec == JournalRecordIds.GROUP_RECORD) { @@ -1348,11 +1382,24 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } else if (rec == JournalRecordIds.SECURITY_RECORD) { PersistedRoles roles = newSecurityRecord(id, buffer); mapPersistedRoles.put(roles.getAddressMatch(), roles); + } else if (rec == JournalRecordIds.QUEUE_STATUS_RECORD) { + QueueStatusEncoding statusEncoding = newQueueStatusEncoding(id, buffer); + PersistentQueueBindingEncoding queueBindingEncoding = mapBindings.get(statusEncoding.queueID); + if (queueBindingEncoding != null) { + queueBindingEncoding.addQueueStatusEncoding(statusEncoding); + } else { + // unlikely to happen, so I didn't bother about the Logger method + logger.info("There is no queue with ID " + statusEncoding.queueID + ", deleting record " + statusEncoding.getId()); + this.deleteQueueStatus(statusEncoding.getId()); + } } else { - throw new IllegalStateException("Invalid record type " + rec); + // unlikely to happen + logger.warn("Invalid record type " + rec, new Exception("invalid record type " + rec)); } } + mapBindings.clear(); // just to give a hand to GC + // This will instruct the IDGenerator to beforeStop old records idGenerator.cleanup(); @@ -1821,6 +1868,23 @@ public abstract class AbstractJournalStorageManager implements StorageManager { return bindingEncoding; } + /** + * @param id + * @param buffer + * @return + */ + protected static QueueStatusEncoding newQueueStatusEncoding(long id, ActiveMQBuffer buffer) { + QueueStatusEncoding statusEncoding = new QueueStatusEncoding(); + + statusEncoding.decode(buffer); + statusEncoding.setId(id); + + return statusEncoding; + } + + + + @Override public boolean addToPage(PagingStore store, ServerMessage msg, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 34a47bfb16..58723c6982 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -74,6 +74,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT; @@ -550,6 +551,9 @@ public final class DescribeJournal { return encoding; } + case QUEUE_STATUS_RECORD: + return AbstractJournalStorageManager.newQueueStatusEncoding(id, buffer); + case QUEUE_BINDING_RECORD: return AbstractJournalStorageManager.newBindingEncoding(id, buffer); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java index 4aa470b11b..0169f38a36 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java @@ -33,6 +33,8 @@ public final class JournalRecordIds { public static final byte QUEUE_BINDING_RECORD = 21; + public static final byte QUEUE_STATUS_RECORD = 22; + /** * Records storing the current recordID number. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java index 4efe2926ee..039460cfb1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.core.persistence.impl.journal.codec; +import java.util.LinkedList; +import java.util.List; + import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.journal.EncodingSupport; @@ -36,6 +39,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin public SimpleString user; + public List queueStatusEncodings; + public PersistentQueueBindingEncoding() { } @@ -106,6 +111,19 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin return autoCreated; } + @Override + public void addQueueStatusEncoding(QueueStatusEncoding status) { + if (queueStatusEncodings == null) { + queueStatusEncodings = new LinkedList<>(); + } + queueStatusEncodings.add(status); + } + + @Override + public List getQueueStatusEncodings() { + return queueStatusEncodings; + } + @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java index 1e05195a21..9220509b19 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.utils.DataConstants; public class QueueEncoding implements EncodingSupport { @@ -44,7 +45,7 @@ public class QueueEncoding implements EncodingSupport { @Override public int getEncodeSize() { - return 8; + return DataConstants.SIZE_LONG; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java new file mode 100644 index 0000000000..fe2b1f5b28 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.persistence.impl.journal.codec; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.persistence.QueueStatus; +import org.apache.activemq.artemis.utils.DataConstants; + +public class QueueStatusEncoding extends QueueEncoding { + + private QueueStatus status; + + private long id; + + public QueueStatusEncoding(long queueID, QueueStatus status) { + super(queueID); + this.status = status; + } + + public QueueStatusEncoding() { + super(); + } + + @Override + public void decode(final ActiveMQBuffer buffer) { + super.decode(buffer); + short shortStatus = buffer.readShort(); + this.status = QueueStatus.fromID(shortStatus); + } + + @Override + public void encode(final ActiveMQBuffer buffer) { + super.encode(buffer); + buffer.writeShort(status.id); + } + + public QueueStatus getStatus() { + return status; + } + + public long getId() { + return id; + } + + public QueueStatusEncoding setId(long id) { + this.id = id; + return this; + } + + @Override + public int getEncodeSize() { + return super.getEncodeSize() + DataConstants.SIZE_SHORT; + } + + @Override + public String toString() { + return "QueueStatusEncoding [id=" + id + ", queueID=" + queueID + ", status=" + status + "]"; + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index f13d2faa73..3a2999ee01 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; +import org.apache.activemq.artemis.core.persistence.QueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; @@ -84,6 +85,16 @@ public class NullStorageManager implements StorageManager { } + @Override + public long storeQueueStatus(long queueID, QueueStatus status) throws Exception { + return 0; + } + + @Override + public void deleteQueueStatus(long recordID) throws Exception { + + } + @Override public void injectMonitor(FileStoreMonitor monitor) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index a9a87c3f68..0dcef3d034 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -206,6 +206,15 @@ public interface Queue extends Bindable { */ void pause(); + /** + * Pauses the queue. It will receive messages but won't give them to the consumers until resumed. + * If a queue is paused, pausing it again will only throw a warning. + * To check if a queue is paused, invoke isPaused() + */ + void pause(boolean persist); + + void reloadPause(long recordID); + /** * Resumes the delivery of message for the queue. * If a queue is resumed, resuming it again will only throw a warning. @@ -218,6 +227,12 @@ public interface Queue extends Bindable { */ boolean isPaused(); + /** + * if the pause was persisted + * @return + */ + boolean isPersistedPause(); + Executor getExecutor(); void resetAllIterators(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index ccb00cb313..9a8ae74552 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -39,9 +39,11 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; +import org.apache.activemq.artemis.core.persistence.QueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; @@ -146,6 +148,13 @@ public class PostOfficeJournalLoader implements JournalLoader { queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName())); } + if (queueBindingInfo.getQueueStatusEncodings() != null) { + for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) { + if (encoding.getStatus() == QueueStatus.PAUSED) + queue.reloadPause(encoding.getId()); + } + } + final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); queues.put(queue.getID(), queue); postOffice.addBinding(binding); @@ -245,7 +254,9 @@ public class PostOfficeJournalLoader implements JournalLoader { ResourceManager resourceManager, Map>> duplicateIDMap) throws Exception { for (Queue queue : queues.values()) { - queue.resume(); + if (!queue.isPersistedPause()) { + queue.resume(); + } } if (System.getProperty("org.apache.activemq.opt.directblast") != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 7c8ad0aff6..d30544fbbd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; +import org.apache.activemq.artemis.core.persistence.QueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -176,6 +177,8 @@ public class QueueImpl implements Queue { private boolean paused; + private long pauseStatusRecord = -1; + private static final int MAX_SCHEDULED_RUNNERS = 2; // We don't ever need more than two DeliverRunner on the executor's list @@ -1718,8 +1721,32 @@ public class QueueImpl implements Queue { @Override public synchronized void pause() { + pause(false); + } + + @Override + public synchronized void reloadPause(long recordID) { + this.paused = true; + if (pauseStatusRecord >= 0) { + try { + storageManager.deleteQueueStatus(pauseStatusRecord); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + this.pauseStatusRecord = recordID; + } + + @Override + public synchronized void pause(boolean persist) { try { this.flushDeliveriesInTransit(); + if (persist && isDurable()) { + if (pauseStatusRecord >= 0) { + storageManager.deleteQueueStatus(pauseStatusRecord); + } + pauseStatusRecord = storageManager.storeQueueStatus(this.id, QueueStatus.PAUSED); + } } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } @@ -1730,6 +1757,15 @@ public class QueueImpl implements Queue { public synchronized void resume() { paused = false; + if (pauseStatusRecord >= 0) { + try { + storageManager.deleteQueueStatus(pauseStatusRecord); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } + pauseStatusRecord = -1; + } + deliverAsync(); } @@ -1738,6 +1774,11 @@ public class QueueImpl implements Queue { return paused; } + @Override + public synchronized boolean isPersistedPause() { + return this.pauseStatusRecord >= 0; + } + @Override public boolean isDirectDeliver() { return directDeliver; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 91e5e7af43..d82f7d3b77 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -845,6 +845,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { this.expectedElements = new CountDownLatch(expectedElements); } + @Override + public boolean isPersistedPause() { + return false; + } + public boolean waitCompletion(long timeout, TimeUnit timeUnit) throws Exception { return expectedElements.await(timeout, timeUnit); } @@ -862,6 +867,14 @@ public class ScheduledDeliveryHandlerTest extends Assert { return 0; } + @Override + public void pause(boolean persist) { + } + + @Override + public void reloadPause(long recordID) { + } + @Override public Filter getFilter() { return null; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 0c2fec550e..93c5c9dd4a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; +import org.apache.activemq.artemis.core.persistence.QueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; @@ -242,6 +243,16 @@ public class TransactionImplTest extends ActiveMQTestBase { } + @Override + public long storeQueueStatus(long queueID, QueueStatus status) throws Exception { + return 0; + } + + @Override + public void deleteQueueStatus(long recordID) throws Exception { + + } + @Override public void pageWrite(PagedMessage message, int pageNumber) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java index f6c4bdee20..7a07439328 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java @@ -350,6 +350,11 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest { proxy.invokeOperation("pause"); } + @Override + public void pause(boolean persist) throws Exception { + proxy.invokeOperation("pause", persist); + } + @Override public void resume() throws Exception { proxy.invokeOperation("resume"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index e8e04f20e4..9b901fc37b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -336,6 +336,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { proxy.invokeOperation("pause"); } + @Override + public void pause(boolean persist) throws Exception { + proxy.invokeOperation("pause", persist); + } + @Override public void resume() throws Exception { proxy.invokeOperation("resume"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java new file mode 100644 index 0000000000..e383c854de --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.server; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class QueuePeristPauseTest extends ActiveMQTestBase { + + @Test + public void testPauseQueue() throws Exception { + ActiveMQServer server = createServer(true, false); + server.start(); + + Queue queue = server.createQueue(SimpleString.toSimpleString("q1"), + SimpleString.toSimpleString("q1"), + null, true, false); + + queue.pause(true); + + server.stop(); + server.start(); + + for (int i = 0; i < 4; i++) { + server.stop(); + server.start(); + queue = server.locateQueue(SimpleString.toSimpleString("q1")); + Assert.assertTrue(queue.isPaused()); + } + + queue.resume(); + + for (int i = 0; i < 4; i++) { + server.stop(); + server.start(); + queue = server.locateQueue(SimpleString.toSimpleString("q1")); + Assert.assertFalse(queue.isPaused()); + } + + } +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 8eae7d6ab1..bba5dc174e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -56,6 +56,16 @@ public class FakeQueue implements Queue { } + @Override + public void reloadPause(long recordID) { + + } + + @Override + public boolean isPersistedPause() { + return false; + } + @Override public int retryMessages(Filter filter) throws Exception { return 0; @@ -102,6 +112,11 @@ public class FakeQueue implements Queue { } + @Override + public void pause(boolean persist) { + + } + @Override public boolean flushExecutor() { return true;