diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java index 90887b0e2c..ce964aa3ef 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java @@ -135,4 +135,31 @@ public interface AddressControl { @Parameter(name = "durable", desc = "Whether the message is durable") boolean durable, @Parameter(name = "user", desc = "The user to authenticate with") String user, @Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception; + + /** + * Pauses all the queues bound to this address.Messages are no longer delivered to all its bounded queues. + * Newly added queue will be paused too until resume is called. + * @throws java.lang.Exception + */ + @Operation(desc = "Pauses the queues bound to this address", impact = MBeanOperationInfo.ACTION) + void pause() throws Exception; + + /** + * Pauses all the queues bound to this address.Messages are no longer delivered to all its bounded queues.Newly added queue will be paused too until resume is called. + * @param persist if true, the pause state will be persisted. + * @throws java.lang.Exception + */ + @Operation(desc = "Pauses the queues bound to this address", impact = MBeanOperationInfo.ACTION) + void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception; + + /** + * Resume all the queues bound of this address.Messages are delivered again to all its bounded queues. + * @throws java.lang.Exception + */ + @Operation(desc = "Resumes the queues bound to this address", impact = MBeanOperationInfo.ACTION) + void resume() throws Exception; + + @Attribute(desc = "indicates if the queues bound to this address are paused") + boolean isPaused(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index b583408cbb..d2ad001700 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -132,8 +132,6 @@ public class AddressControlImpl extends AbstractControl implements AddressContro if (AuditLogger.isEnabled()) { AuditLogger.getQueueNames(this.addressInfo); } - - String[] result; clearIO(); try { Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName()); @@ -373,6 +371,54 @@ public class AddressControlImpl extends AbstractControl implements AddressContro return MBeanInfoHelper.getMBeanAttributesInfo(AddressControl.class); } + @Override + public void pause() { + pause(false); + } + + @Override + public void pause(boolean persist) { + if (AuditLogger.isEnabled()) { + AuditLogger.pause(addressInfo); + } + checkStarted(); + + clearIO(); + try { + addressInfo.setPostOffice(server.getPostOffice()); + addressInfo.setStorageManager(server.getStorageManager()); + addressInfo.pause(persist); + } finally { + blockOnIO(); + } + } + + + @Override + public void resume() { + if (AuditLogger.isEnabled()) { + AuditLogger.resume(addressInfo); + } + checkStarted(); + + clearIO(); + try { + addressInfo.setPostOffice(server.getPostOffice()); + addressInfo.setStorageManager(server.getStorageManager()); + addressInfo.resume(); + } finally { + blockOnIO(); + } + } + + @Override + public boolean isPaused() { + if (AuditLogger.isEnabled()) { + AuditLogger.isPaused(this.addressInfo); + } + return addressInfo.isPaused(); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -409,6 +455,12 @@ public class AddressControlImpl extends AbstractControl implements AddressContro } } + private void checkStarted() { + if (!server.getPostOffice().isStarted()) { + throw new IllegalStateException("Broker is not started. Queues can not be managed yet"); + } + } + // Inner classes ------------------------------------------------- private enum DurabilityType { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java index 902c16b6d1..54aa3d4985 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java @@ -20,6 +20,7 @@ import java.util.EnumSet; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AddressStatusEncoding; public interface AddressBindingInfo { @@ -28,4 +29,6 @@ public interface AddressBindingInfo { SimpleString getName(); EnumSet getRoutingTypes(); + + AddressStatusEncoding getAddressStatusEncoding(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressQueueStatus.java similarity index 74% rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java rename to artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressQueueStatus.java index f4a8bdd853..9f5124c983 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressQueueStatus.java @@ -17,30 +17,29 @@ package org.apache.activemq.artemis.core.persistence; -public enum QueueStatus { +public enum AddressQueueStatus { PAUSED((short) 0), RUNNING((short) 1); public final short id; - QueueStatus(short id) { + AddressQueueStatus(short id) { this.id = id; } - public static QueueStatus[] values; + public static AddressQueueStatus[] values; static { - QueueStatus[] allValues = QueueStatus.values(); - values = new QueueStatus[allValues.length]; - for (QueueStatus v : allValues) { + AddressQueueStatus[] allValues = AddressQueueStatus.values(); + values = new AddressQueueStatus[allValues.length]; + for (AddressQueueStatus v : allValues) { values[v.id] = v; } } - public static QueueStatus fromID(short id) { + public static AddressQueueStatus fromID(short id) { if (id < 0 || id >= values.length) { return null; - } else { - return values[id]; } + return values[id]; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 73c43febaf..0436398fb1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -311,10 +311,14 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { * @return the id of the journal * @throws Exception */ - long storeQueueStatus(long queueID, QueueStatus status) throws Exception; + long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception; void deleteQueueStatus(long recordID) throws Exception; + long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception; + + void deleteAddressStatus(long recordID) throws Exception; + void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception; void deleteAddressBinding(long tx, long addressBindingID) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index fd14d552d6..238f39b58c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -69,11 +69,12 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; -import org.apache.activemq.artemis.core.persistence.QueueStatus; +import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AddressStatusEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeleteEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding; @@ -1322,7 +1323,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } @Override - public long storeQueueStatus(long queueID, QueueStatus status) throws Exception { + public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception { long recordID = idGenerator.generateID(); readLock(); @@ -1346,6 +1347,31 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } } + @Override + public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception { + long recordID = idGenerator.generateID(); + + readLock(); + try { + bindingsJournal.appendAddRecord(recordID, JournalRecordIds.ADDRESS_STATUS_RECORD, new AddressStatusEncoding(addressID, status), true); + } finally { + readUnLock(); + } + + + return recordID; + } + + @Override + public void deleteAddressStatus(long recordID) throws Exception { + readLock(); + try { + bindingsJournal.appendDeleteRecord(recordID, true); + } finally { + readUnLock(); + } + } + @Override public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception { PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), @@ -1465,6 +1491,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null); HashMap mapBindings = new HashMap<>(); + HashMap mapAddressBindings = new HashMap<>(); for (RecordInfo record : records) { long id = record.id; @@ -1481,6 +1508,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) { PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer); addressBindingInfos.add(bindingEncoding); + mapAddressBindings.put(id, bindingEncoding); } else if (rec == JournalRecordIds.GROUP_RECORD) { GroupingEncoding encoding = newGroupEncoding(id, buffer); groupingInfos.add(encoding); @@ -1500,6 +1528,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp ActiveMQServerLogger.LOGGER.infoNoQueueWithID(statusEncoding.queueID, statusEncoding.getId()); this.deleteQueueStatus(statusEncoding.getId()); } + } else if (rec == JournalRecordIds.ADDRESS_STATUS_RECORD) { + AddressStatusEncoding statusEncoding = newAddressStatusEncoding(id, buffer); + PersistentAddressBindingEncoding addressBindingEncoding = mapAddressBindings.get(statusEncoding.getAddressId()); + if (addressBindingEncoding != null) { + addressBindingEncoding.setAddressStatusEncoding(statusEncoding); + } else { + // unlikely to happen, so I didn't bother about the Logger method + ActiveMQServerLogger.LOGGER.infoNoAddressWithID(statusEncoding.getAddressId(), statusEncoding.getId()); + this.deleteAddressStatus(statusEncoding.getId()); + } } else { // unlikely to happen ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new Exception("invalid record type " + rec)); @@ -1962,6 +2000,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp return setting; } + static AddressStatusEncoding newAddressStatusEncoding(long id, ActiveMQBuffer buffer) { + AddressStatusEncoding addressStatus = new AddressStatusEncoding(); + addressStatus.decode(buffer); + addressStatus.setId(id); + return addressStatus; + } + /** * @param id * @param buffer diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 0723d70cc8..e579704152 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -72,6 +72,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_STATUS_RECORD; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE; @@ -656,6 +657,9 @@ public final class DescribeJournal { case ADDRESS_BINDING_RECORD: return AbstractJournalStorageManager.newAddressBindingEncoding(id, buffer); + case ADDRESS_STATUS_RECORD: + return AbstractJournalStorageManager.newAddressStatusEncoding(id, buffer); + default: return null; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java index 348ac9b3dc..0fcb819334 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java @@ -88,4 +88,6 @@ public final class JournalRecordIds { public static final byte ADD_MESSAGE_PROTOCOL = 45; + public static final byte ADDRESS_STATUS_RECORD = 46; + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AddressStatusEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AddressStatusEncoding.java new file mode 100644 index 0000000000..aef80a907c --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AddressStatusEncoding.java @@ -0,0 +1,86 @@ +/* + * Copyright 2019 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.persistence.impl.journal.codec; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; +import org.apache.activemq.artemis.utils.DataConstants; + +public class AddressStatusEncoding implements EncodingSupport { + + private AddressQueueStatus status; + + private long addressId; + + private long id; + + public AddressStatusEncoding(long addressId, AddressQueueStatus status) { + this.status = status; + this.addressId = addressId; + } + + public AddressStatusEncoding() { + } + + public AddressQueueStatus getStatus() { + return status; + } + + public void setStatus(AddressQueueStatus status) { + this.status = status; + } + + public long getAddressId() { + return addressId; + } + + public void setAddressId(long addressId) { + this.addressId = addressId; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + @Override + public int getEncodeSize() { + return DataConstants.SIZE_LONG + DataConstants.SIZE_SHORT; + } + + @Override + public void encode(ActiveMQBuffer buffer) { + buffer.writeLong(addressId); + buffer.writeShort(status.id); + } + + @Override + public void decode(ActiveMQBuffer buffer) { + this.addressId = buffer.readLong(); + short shortStatus = buffer.readShort(); + this.status = AddressQueueStatus.fromID(shortStatus); + } + + @Override + public String toString() { + return "AddressStatusEncoding{" + "status=" + status + ", id=" + addressId + '}'; + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java index 3ed3f96a36..057fa2792c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java @@ -32,6 +32,7 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres public SimpleString name; public boolean autoCreated; + public AddressStatusEncoding addressStatusEncoding; public EnumSet routingTypes; @@ -82,6 +83,15 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres return routingTypes; } + @Override + public AddressStatusEncoding getAddressStatusEncoding() { + return addressStatusEncoding; + } + + public void setAddressStatusEncoding(AddressStatusEncoding addressStatusEncoding) { + this.addressStatusEncoding = addressStatusEncoding; + } + @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java index fe2b1f5b28..12970ed483 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java @@ -18,16 +18,16 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.persistence.QueueStatus; +import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; import org.apache.activemq.artemis.utils.DataConstants; public class QueueStatusEncoding extends QueueEncoding { - private QueueStatus status; + private AddressQueueStatus status; private long id; - public QueueStatusEncoding(long queueID, QueueStatus status) { + public QueueStatusEncoding(long queueID, AddressQueueStatus status) { super(queueID); this.status = status; } @@ -40,7 +40,7 @@ public class QueueStatusEncoding extends QueueEncoding { public void decode(final ActiveMQBuffer buffer) { super.decode(buffer); short shortStatus = buffer.readShort(); - this.status = QueueStatus.fromID(shortStatus); + this.status = AddressQueueStatus.fromID(shortStatus); } @Override @@ -49,7 +49,7 @@ public class QueueStatusEncoding extends QueueEncoding { buffer.writeShort(status.id); } - public QueueStatus getStatus() { + public AddressQueueStatus getStatus() { return status; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 577ce8b6fe..ef9ff94e0f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -43,7 +43,7 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; -import org.apache.activemq.artemis.core.persistence.QueueStatus; +import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; @@ -88,7 +88,7 @@ public class NullStorageManager implements StorageManager { } @Override - public long storeQueueStatus(long queueID, QueueStatus status) throws Exception { + public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception { return 0; } @@ -101,6 +101,16 @@ public class NullStorageManager implements StorageManager { } + @Override + public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception { + return 0; + } + + @Override + public void deleteAddressStatus(long recordID) throws Exception { + + } + @Override public void injectMonitor(FileStoreMonitor monitor) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 4751473f43..babf60caf2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -430,6 +430,10 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 221080, value = "Deploying address {0} supporting {1}", format = Message.Format.MESSAGE_FORMAT) void deployAddress(String addressName, String routingTypes); + @LogMessage(level = Logger.Level.INFO) + @Message(id = 221081, value = "There is no address with ID {0}, deleting record {1}", format = Message.Format.MESSAGE_FORMAT) + void infoNoAddressWithID(Long id, Long record); + @LogMessage(level = Logger.Level.WARN) @Message(id = 222000, value = "ActiveMQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index f1195c6aee..4b2d07cba8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -24,6 +24,12 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.utils.CompositeAddress; @@ -32,6 +38,7 @@ import org.apache.activemq.artemis.utils.PrefixUtil; public class AddressInfo { private long id; + private long pauseStatusRecord = -1; private final SimpleString name; @@ -53,6 +60,11 @@ public class AddressInfo { private long bindingRemovedTimestamp = -1; + private volatile boolean paused = false; + + private PostOffice postOffice; + private StorageManager storageManager; + public AddressInfo(SimpleString name) { this(name, EnumSet.noneOf(RoutingType.class)); } @@ -136,21 +148,94 @@ public class AddressInfo { this.bindingRemovedTimestamp = bindingRemovedTimestamp; } + public synchronized void pause(boolean persist) { + if (postOffice == null) { + throw new IllegalStateException(""); + } + if (storageManager == null && persist) { + throw new IllegalStateException(""); + } + if (this.paused) { + return; + } + try { + if (persist) { + this.pauseStatusRecord = storageManager.storeAddressStatus(this.getId(), AddressQueueStatus.PAUSED); + } + Bindings bindings = postOffice.lookupBindingsForAddress(this.getName()); + if (bindings != null) { + for (Binding binding : bindings.getBindings()) { + if (binding instanceof QueueBinding) { + ((QueueBinding) binding).getQueue().pause(persist); + } + } + } + this.paused = true; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public synchronized void resume() { + if (postOffice == null) { + throw new IllegalStateException(""); + } + if (storageManager == null && this.pauseStatusRecord > 0) { + throw new IllegalStateException(""); + } + if (!this.paused) { + return; + } + try { + if (this.pauseStatusRecord > 0) { + storageManager.deleteAddressStatus(this.pauseStatusRecord); + } + Bindings bindings = postOffice.lookupBindingsForAddress(this.getName()); + if (bindings != null) { + for (Binding binding : bindings.getBindings()) { + if (binding instanceof QueueBinding) { + ((QueueBinding) binding).getQueue().resume(); + } + } + } + this.paused = false; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + boolean isPersisted() { + return this.paused && this.pauseStatusRecord > 0; + } + + public boolean isPaused() { + return this.paused; + } + + public void setPostOffice(PostOffice postOffice) { + this.postOffice = postOffice; + } + + public void setStorageManager(StorageManager storageManager) { + this.storageManager = storageManager; + } + @Override public String toString() { - StringBuffer buff = new StringBuffer(); - buff.append("Address [name=" + name); - buff.append(", id=" + id); + StringBuilder buff = new StringBuilder(); + buff.append("Address [name=").append(name); + buff.append(", id=").append(id); buff.append(", routingTypes={"); for (RoutingType routingType : getRoutingTypes()) { - buff.append(routingType.toString() + ","); + buff.append(routingType.toString()).append(","); } // delete hanging comma if (buff.charAt(buff.length() - 1) == ',') { buff.deleteCharAt(buff.length() - 1); } buff.append("}"); - buff.append(", autoCreated=" + autoCreated); + buff.append(", autoCreated=").append(autoCreated); + buff.append(", paused=").append(paused); buff.append("]"); return buff.toString(); } @@ -166,6 +251,9 @@ public class AddressInfo { public AddressInfo create(SimpleString name, RoutingType routingType) { AddressInfo info = new AddressInfo(name, routingType); info.setInternal(this.internal); + if (paused) { + info.pause(this.pauseStatusRecord > 0); + } return info; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 67b10070c6..f3bd10e4f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; -import org.apache.activemq.artemis.core.persistence.QueueStatus; +import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord; @@ -170,7 +170,7 @@ public class PostOfficeJournalLoader implements JournalLoader { if (queueBindingInfo.getQueueStatusEncodings() != null) { for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) { - if (encoding.getStatus() == QueueStatus.PAUSED) + if (encoding.getStatus() == AddressQueueStatus.PAUSED) queue.reloadPause(encoding.getId()); } } @@ -193,6 +193,11 @@ public class PostOfficeJournalLoader implements JournalLoader { AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes()); addressInfo.setId(addressBindingInfo.getId()); + if (addressBindingInfo.getAddressStatusEncoding() != null && addressBindingInfo.getAddressStatusEncoding().getStatus() == AddressQueueStatus.PAUSED) { + addressInfo.setStorageManager(storageManager); + addressInfo.setPostOffice(postOffice); + addressInfo.pause(true); + } postOffice.reloadAddressInfo(addressInfo); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 44b55e0c97..073bafa6f9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -59,7 +59,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; import org.apache.activemq.artemis.core.persistence.OperationContext; -import org.apache.activemq.artemis.core.persistence.QueueStatus; +import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -578,6 +578,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.factory = factory; registerMeters(); + if (this.addressInfo != null && this.addressInfo.isPaused()) { + this.pause(this.addressInfo.isPersisted()); + } } // Bindable implementation ------------------------------------------------------------------------------------- @@ -2368,7 +2371,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (pauseStatusRecord >= 0) { storageManager.deleteQueueStatus(pauseStatusRecord); } - pauseStatusRecord = storageManager.storeQueueStatus(this.id, QueueStatus.PAUSED); + pauseStatusRecord = storageManager.storeQueueStatus(this.id, AddressQueueStatus.PAUSED); } } catch (Exception e) { ActiveMQServerLogger.LOGGER.unableToPauseQueue(e); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 0a15022a60..7c0ce44ba2 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -40,10 +40,10 @@ import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; +import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; -import org.apache.activemq.artemis.core.persistence.QueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; @@ -251,7 +251,7 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public long storeQueueStatus(long queueID, QueueStatus status) throws Exception { + public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception { return 0; } @@ -713,5 +713,15 @@ public class TransactionImplTest extends ActiveMQTestBase { public long getCurrentID() { return 0; } + + @Override + public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception { + return 0; + } + + @Override + public void deleteAddressStatus(long recordID) throws Exception { + + } } } diff --git a/docs/user-manual/en/management.md b/docs/user-manual/en/management.md index 60ef2a184f..7f33da3a3b 100644 --- a/docs/user-manual/en/management.md +++ b/docs/user-manual/en/management.md @@ -135,6 +135,13 @@ Individual addresses can be managed using the `AddressControl` interface. `removeRole()` methods. You can list all the roles associated to the queue with the `getRoles()` method +- Pausing and resuming Address + + The `AddressControl` can pause and resume an address and all the queues that + are bound to it. Newly added queue will be paused too until the address is resumed. + Thus all messages sent to the address will be recived but not delivered. When it is + resumed, delivering will occur again. + #### Queue Management The bulk of the management API deals with queues. The `QueueControl` interface diff --git a/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java b/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java index b573b11cbb..44f17eea6a 100644 --- a/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java +++ b/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java @@ -35,6 +35,7 @@ public class GroovyRun { public static final String TWO_FOUR = "ARTEMIS-240"; public static final String TWO_SIX_THREE = "ARTEMIS-263"; public static final String TWO_SEVEN_ZERO = "ARTEMIS-270"; + public static final String TWO_NINE_ZERO = "ARTEMIS-290"; public static final String HORNETQ_235 = "HORNETQ-235"; public static final String HORNETQ_247 = "HORNETQ-247"; @@ -115,7 +116,7 @@ public class GroovyRun { } public static void assertEquals(Object value1, Object value2) { - if (!value1.equals(value2)) { + if ((value1 == null && value2 == null) || !value1.equals(value2)) { throw new RuntimeException(value1 + "!=" + value2); } } diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/address_ispaused.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/address_ispaused.groovy new file mode 100644 index 0000000000..e6083f2ed7 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/address_ispaused.groovy @@ -0,0 +1,26 @@ +package journalcompatibility + +import org.apache.activemq.artemis.api.core.management.AddressControl +import org.apache.activemq.artemis.api.core.management.ResourceNames +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +String serverType = arg[0]; +AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic"); +GroovyRun.assertNotNull(addressControl) +GroovyRun.assertTrue(addressControl.isPaused()) \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/address_isrunning.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/address_isrunning.groovy new file mode 100644 index 0000000000..b72214dfe0 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/address_isrunning.groovy @@ -0,0 +1,26 @@ +package journalcompatibility + +import org.apache.activemq.artemis.api.core.management.AddressControl +import org.apache.activemq.artemis.api.core.management.ResourceNames +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +String serverType = arg[0]; +AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic"); +GroovyRun.assertNotNull(addressControl) +GroovyRun.assertTrue(!addressControl.isPaused()) \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/pause_address.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/pause_address.groovy new file mode 100644 index 0000000000..06e1b711cf --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/pause_address.groovy @@ -0,0 +1,26 @@ +package journalcompatibility + +import org.apache.activemq.artemis.api.core.management.AddressControl +import org.apache.activemq.artemis.api.core.management.ResourceNames +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic"); +GroovyRun.assertTrue(!addressControl.isPaused()) +addressControl.pause(true) +GroovyRun.assertTrue(addressControl.isPaused()) \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/pause_queue.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/pause_queue.groovy new file mode 100644 index 0000000000..93724822e1 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/pause_queue.groovy @@ -0,0 +1,25 @@ +package journalcompatibility + +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.server.Queue +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue")) +queue.pause(true); \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/queue_ispaused.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/queue_ispaused.groovy new file mode 100644 index 0000000000..f57ded4d57 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/queue_ispaused.groovy @@ -0,0 +1,25 @@ +package journalcompatibility + +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.server.Queue +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue")) +GroovyRun.assertTrue(queue.isPaused()) \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/queue_isrunning.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/queue_isrunning.groovy new file mode 100644 index 0000000000..c9170ebd89 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/queue_isrunning.groovy @@ -0,0 +1,25 @@ +package journalcompatibility + +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.server.Queue +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue")) +GroovyRun.assertTrue(!queue.isPaused()) \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/receiveMessagesAddress.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/receiveMessagesAddress.groovy new file mode 100644 index 0000000000..ffc4fbefa9 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/receiveMessagesAddress.groovy @@ -0,0 +1,48 @@ +package journalcompatibility + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun +import org.apache.activemq.artemis.api.core.management.AddressControl +import org.apache.activemq.artemis.api.core.management.ResourceNames +import org.apache.activemq.artemis.core.server.impl.AddressInfo + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +try { + cf.setEnable1xPrefixes(true); +} catch (Throwable totallyIgnored) { + // older versions will not have this method, dont even bother about seeing the stack trace or exception +} +Connection connection = cf.createConnection(); +Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +Topic topic = session.createTopic("jms.topic.MyTopic"); +MessageProducer producer = session.createProducer(topic); +MessageConsumer messageConsumer1 = session.createConsumer(topic); +MessageConsumer messageConsumer2 = session.createConsumer(topic); +TextMessage message = session.createTextMessage("This is a text message"); +System.out.println("Sent message: " + message.getText()); +producer.send(message); +connection.start(); +TextMessage messageReceived = (TextMessage) messageConsumer1.receive(5000); +GroovyRun.assertNotNull(messageReceived); +System.out.println("Consumer 1 Received message: " + messageReceived.getText()); +messageReceived = (TextMessage) messageConsumer2.receive(5000); +GroovyRun.assertNotNull(messageReceived); +System.out.println("Consumer 2 Received message: " + messageReceived.getText()); \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/resume_address.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/resume_address.groovy new file mode 100644 index 0000000000..500b78ad6b --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/resume_address.groovy @@ -0,0 +1,26 @@ +package journalcompatibility + +import org.apache.activemq.artemis.api.core.management.AddressControl +import org.apache.activemq.artemis.api.core.management.ResourceNames +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic"); +GroovyRun.assertTrue(addressControl.isPaused()) +addressControl.resume() +GroovyRun.assertTrue(!addressControl.isPaused()) \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/resume_queue.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/resume_queue.groovy new file mode 100644 index 0000000000..26c505e23b --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/resume_queue.groovy @@ -0,0 +1,25 @@ +package journalcompatibility + +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.server.Queue +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue")) +queue.resume(); diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/sendMessagesAddress.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/sendMessagesAddress.groovy new file mode 100644 index 0000000000..6563b33055 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/sendMessagesAddress.groovy @@ -0,0 +1,82 @@ +package journalcompatibility + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun +import org.apache.activemq.artemis.api.core.management.AddressControl +import org.apache.activemq.artemis.api.core.management.ResourceNames +import org.apache.activemq.artemis.core.server.impl.AddressInfo + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +try { + cf.setEnable1xPrefixes(true); +} catch (Throwable totallyIgnored) { + // older versions will not have this method, dont even bother about seeing the stack trace or exception +} +Connection connection = cf.createConnection(); +Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +Topic topic = session.createTopic("jms.topic.MyTopic"); +MessageProducer producer = session.createProducer(topic); +MessageConsumer messageConsumer1 = session.createConsumer(topic); +MessageConsumer messageConsumer2 = session.createConsumer(topic); +TextMessage message = session.createTextMessage("This is a text message"); +System.out.println("Sent message: " + message.getText()); +AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic"); +addressControl.pause(true); +producer.send(message); +connection.start(); +TextMessage messageReceived = (TextMessage) messageConsumer1.receive(5000); +GroovyRun.assertNull(messageReceived); +messageReceived = (TextMessage) messageConsumer2.receive(5000); +GroovyRun.assertNull(messageReceived); + +// +// +// +//ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +//try { +// cf.setEnable1xPrefixes(true); +//} catch (Throwable totallyIgnored) { +// // older versions will not have this method, dont even bother about seeing the stack trace or exception +//} +//Connection connection = cf.createConnection(); +//connection.setClientID("myClientID"); +//Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); +//connection.start(); +// +//Topic topic = session.createTopic("jms.topic.MyTopic"); +//TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1"); +// +//MessageProducer producer = session.createProducer(topic); +//producer.setDeliveryMode(DeliveryMode.PERSISTENT); +// +//AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic"); +// +//addressControl.pause(true) +//for (int i = 0; i < 100; i++) { +// TextMessage mess = session.createTextMessage("msg" + i); +// producer.send(mess); +//} +//for (int i = 0; i < 100; i++) { +// TextMessage m = (TextMessage) subscriber1.receive(500) +// GroovyRun.assertNull(m) +//} +//session.close(); +//connection.close(); \ No newline at end of file diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/AddressPauseJournalCompatibilityTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/AddressPauseJournalCompatibilityTest.java new file mode 100644 index 0000000000..a8cbd27ee5 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/AddressPauseJournalCompatibilityTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_NINE_ZERO; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * To run this test on the IDE and debug it, run the compatibility-tests through a command line once: + * + * cd /compatibility-tests + * mvn install -Ptests | tee output.log + * + * on the output.log you will see the output generated by {@link #getClasspath(String)} + * + * On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test. + * On Idea you would do the following: + * + * Run->Edit Configuration->Add ArtemisMeshTest and add your properties. + */ +@RunWith(Parameterized.class) +public class AddressPauseJournalCompatibilityTest extends VersionedBase { + + // this will ensure that all tests in this class are run twice, + // once with "true" passed to the class' constructor and once with "false" + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + // we don't need every single version ever released.. + // if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time + List combinations = new ArrayList<>(); + + /* + // during development sometimes is useful to comment out the combinations + // and add the ones you are interested.. example: + */ + // combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE}); + // combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE}); + + combinations.add(new Object[]{null, TWO_NINE_ZERO, SNAPSHOT}); + combinations.add(new Object[]{null, SNAPSHOT, TWO_NINE_ZERO}); + // the purpose on this one is just to validate the test itself. + /// if it can't run against itself it won't work at all + combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public AddressPauseJournalCompatibilityTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Before + public void removeFolder() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + serverFolder.getRoot().mkdirs(); + } + + @After + public void tearDown() { + try { + stopServer(serverClassloader); + } catch (Throwable ignored) { + } + try { + stopServer(receiverClassloader); + } catch (Throwable ignored) { + } + } + + @Test + public void testSendReceive() throws Throwable { + setVariable(senderClassloader, "persistent", true); + startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true); + evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); + evaluate(senderClassloader, "journalcompatibility/queue_isrunning.groovy"); + evaluate(senderClassloader, "journalcompatibility/pause_queue.groovy"); + evaluate(senderClassloader, "journalcompatibility/queue_ispaused.groovy"); + stopServer(senderClassloader); + + setVariable(receiverClassloader, "persistent", true); + startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false); + + setVariable(receiverClassloader, "latch", null); + evaluate(receiverClassloader, "journalcompatibility/queue_ispaused.groovy"); + evaluate(receiverClassloader, "journalcompatibility/resume_queue.groovy"); + evaluate(receiverClassloader, "journalcompatibility/resume_queue.groovy"); + evaluate(receiverClassloader, "journalcompatibility/queue_isrunning.groovy"); + evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); + } + + @Test + public void testSendReceiveTopic() throws Throwable { + if (TWO_NINE_ZERO.equals(this.sender)) { + return; + } + setVariable(senderClassloader, "persistent", true); + startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true); + evaluate(senderClassloader, "journalcompatibility/receiveMessagesAddress.groovy", server, sender); + evaluate(senderClassloader, "journalcompatibility/sendMessagesAddress.groovy", server, sender); + evaluate(senderClassloader, "journalcompatibility/address_ispaused.groovy", server); + evaluate(senderClassloader, "journalcompatibility/resume_address.groovy"); + evaluate(senderClassloader, "journalcompatibility/address_isrunning.groovy", server); + evaluate(senderClassloader, "journalcompatibility/pause_address.groovy"); + evaluate(senderClassloader, "journalcompatibility/address_ispaused.groovy", server); + stopServer(senderClassloader); + + setVariable(receiverClassloader, "persistent", true); + startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false); + stopServer(receiverClassloader); + + startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true); + evaluate(senderClassloader, "journalcompatibility/address_ispaused.groovy", server); + evaluate(senderClassloader, "journalcompatibility/resume_address.groovy"); + evaluate(senderClassloader, "journalcompatibility/address_isrunning.groovy", server); + evaluate(senderClassloader, "journalcompatibility/receiveMessagesAddress.groovy", server, sender); + stopServer(senderClassloader); + } +} + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java index ba84d8fbe7..859c613717 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java @@ -53,5 +53,24 @@ public class AddressConfigTest extends ActiveMQTestBase { Set routingTypeSet = new HashSet<>(); routingTypeSet.add(RoutingType.MULTICAST); assertEquals(routingTypeSet, addressInfo.getRoutingTypes()); + assertFalse(addressInfo.isPaused()); + + addressInfo.setPostOffice(server.getPostOffice()); + addressInfo.setStorageManager(server.getStorageManager()); + addressInfo.pause(true); + assertTrue(addressInfo.isPaused()); + long id = addressInfo.getId(); + server.stop(); + server.start(); + addressInfo = server.getAddressInfo(SimpleString.toSimpleString("myAddress")); + assertNotNull(addressInfo); + routingTypeSet = new HashSet<>(); + routingTypeSet.add(RoutingType.MULTICAST); + assertEquals(routingTypeSet, addressInfo.getRoutingTypes()); + assertEquals(id, addressInfo.getId()); + assertTrue(addressInfo.isPaused()); + addressInfo.setPostOffice(server.getPostOffice()); + addressInfo.setStorageManager(server.getStorageManager()); + addressInfo.resume(); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AddressPauseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AddressPauseTest.java new file mode 100644 index 0000000000..79b9e71f58 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AddressPauseTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2019 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import org.apache.activemq.artemis.api.core.management.AddressControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class AddressPauseTest extends JMSTestBase { + + @Test + public void testPauseAddress() throws Exception { + try (Connection connection = cf.createConnection()) { + connection.setClientID("myClientID"); + connection.start(); + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + Topic topic = session.createTopic("jms.topic.MyTopic"); + TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1"); + AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic"); + MessageProducer producer = session.createProducer(topic); + final int numMessages = 100; + for (int i = 0; i < numMessages; i++) { + TextMessage mess = session.createTextMessage("msg" + i); + producer.send(mess); + } + for (int i = 0; i < numMessages; i++) { + TextMessage m = (TextMessage) subscriber1.receive(5000); + Assert.assertNotNull(m); + } + //Pausing the subscriptions + addressControl.pause(); + Assert.assertTrue(addressControl.isPaused()); + //subscriber2 should be paused too + TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, "my-subscription2"); + for (int i = 0; i < numMessages; i++) { + TextMessage mess = session.createTextMessage("msg" + i); + producer.send(mess); + } + TextMessage message = (TextMessage) subscriber1.receive(5000); + Assert.assertNull(message); + message = (TextMessage) subscriber2.receive(5000); + Assert.assertNull(message); + //Resuming the subscriptions + addressControl.resume(); + for (int i = 0; i < numMessages; i++) { + TextMessage m = (TextMessage) subscriber1.receive(5000); + Assert.assertNotNull(m); + } + for (int i = 0; i < numMessages; i++) { + TextMessage m = (TextMessage) subscriber2.receive(5000); + Assert.assertNotNull(m); + } + } + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index ab1932fe3a..a5c30cfe00 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -55,7 +55,7 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; -import org.apache.activemq.artemis.core.persistence.QueueStatus; +import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; @@ -624,7 +624,7 @@ public class SendAckFailTest extends SpawnedTestBase { } @Override - public long storeQueueStatus(long queueID, QueueStatus status) throws Exception { + public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception { return manager.storeQueueStatus(queueID, status); } @@ -633,6 +633,17 @@ public class SendAckFailTest extends SpawnedTestBase { manager.deleteQueueStatus(recordID); } + @Override + public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception { + return manager.storeAddressStatus(addressID, status); + } + + + @Override + public void deleteAddressStatus(long recordID) throws Exception { + manager.deleteAddressStatus(recordID); + } + @Override public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception { manager.addAddressBinding(tx, addressInfo); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java index 5e5dc5438c..9ed7bef590 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java @@ -113,6 +113,26 @@ public class AddressControlUsingCoreTest extends AddressControlTest { return (long) proxy.retrieveAttributeValue("unRoutedMessageCount"); } + @Override + public void pause() throws Exception { + proxy.invokeOperation("pause"); + } + + @Override + public void pause(boolean persist) throws Exception { + proxy.invokeOperation("pause", persist); + } + + @Override + public void resume() throws Exception { + proxy.invokeOperation("resume"); + } + + @Override + public boolean isPaused() { + return (boolean) proxy.retrieveAttributeValue("paused"); + } + @Override public String sendMessage(Map headers, int type,