ARTEMIS-2401 Implement the Pause method for an Address
Adding support to pause an Address and all its bound queues. Jira: https://issues.apache.org/jira/browse/ARTEMIS-2401
This commit is contained in:
parent
30ee0daa90
commit
3aa3fa777f
|
@ -135,4 +135,31 @@ public interface AddressControl {
|
|||
@Parameter(name = "durable", desc = "Whether the message is durable") boolean durable,
|
||||
@Parameter(name = "user", desc = "The user to authenticate with") String user,
|
||||
@Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception;
|
||||
|
||||
/**
|
||||
* Pauses all the queues bound to this address.Messages are no longer delivered to all its bounded queues.
|
||||
* Newly added queue will be paused too until resume is called.
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Operation(desc = "Pauses the queues bound to this address", impact = MBeanOperationInfo.ACTION)
|
||||
void pause() throws Exception;
|
||||
|
||||
/**
|
||||
* Pauses all the queues bound to this address.Messages are no longer delivered to all its bounded queues.Newly added queue will be paused too until resume is called.
|
||||
* @param persist if true, the pause state will be persisted.
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Operation(desc = "Pauses the queues bound to this address", impact = MBeanOperationInfo.ACTION)
|
||||
void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception;
|
||||
|
||||
/**
|
||||
* Resume all the queues bound of this address.Messages are delivered again to all its bounded queues.
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Operation(desc = "Resumes the queues bound to this address", impact = MBeanOperationInfo.ACTION)
|
||||
void resume() throws Exception;
|
||||
|
||||
@Attribute(desc = "indicates if the queues bound to this address are paused")
|
||||
boolean isPaused();
|
||||
|
||||
}
|
||||
|
|
|
@ -132,8 +132,6 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
|
|||
if (AuditLogger.isEnabled()) {
|
||||
AuditLogger.getQueueNames(this.addressInfo);
|
||||
}
|
||||
|
||||
String[] result;
|
||||
clearIO();
|
||||
try {
|
||||
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
|
||||
|
@ -373,6 +371,54 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
|
|||
return MBeanInfoHelper.getMBeanAttributesInfo(AddressControl.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause() {
|
||||
pause(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause(boolean persist) {
|
||||
if (AuditLogger.isEnabled()) {
|
||||
AuditLogger.pause(addressInfo);
|
||||
}
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
addressInfo.setPostOffice(server.getPostOffice());
|
||||
addressInfo.setStorageManager(server.getStorageManager());
|
||||
addressInfo.pause(persist);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
if (AuditLogger.isEnabled()) {
|
||||
AuditLogger.resume(addressInfo);
|
||||
}
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
addressInfo.setPostOffice(server.getPostOffice());
|
||||
addressInfo.setStorageManager(server.getStorageManager());
|
||||
addressInfo.resume();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPaused() {
|
||||
if (AuditLogger.isEnabled()) {
|
||||
AuditLogger.isPaused(this.addressInfo);
|
||||
}
|
||||
return addressInfo.isPaused();
|
||||
}
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
||||
// Protected -----------------------------------------------------
|
||||
|
@ -409,6 +455,12 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
|
|||
}
|
||||
}
|
||||
|
||||
private void checkStarted() {
|
||||
if (!server.getPostOffice().isStarted()) {
|
||||
throw new IllegalStateException("Broker is not started. Queues can not be managed yet");
|
||||
}
|
||||
}
|
||||
|
||||
// Inner classes -------------------------------------------------
|
||||
|
||||
private enum DurabilityType {
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.EnumSet;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AddressStatusEncoding;
|
||||
|
||||
public interface AddressBindingInfo {
|
||||
|
||||
|
@ -28,4 +29,6 @@ public interface AddressBindingInfo {
|
|||
SimpleString getName();
|
||||
|
||||
EnumSet<RoutingType> getRoutingTypes();
|
||||
|
||||
AddressStatusEncoding getAddressStatusEncoding();
|
||||
}
|
||||
|
|
|
@ -17,30 +17,29 @@
|
|||
|
||||
package org.apache.activemq.artemis.core.persistence;
|
||||
|
||||
public enum QueueStatus {
|
||||
public enum AddressQueueStatus {
|
||||
PAUSED((short) 0), RUNNING((short) 1);
|
||||
|
||||
public final short id;
|
||||
|
||||
QueueStatus(short id) {
|
||||
AddressQueueStatus(short id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public static QueueStatus[] values;
|
||||
public static AddressQueueStatus[] values;
|
||||
|
||||
static {
|
||||
QueueStatus[] allValues = QueueStatus.values();
|
||||
values = new QueueStatus[allValues.length];
|
||||
for (QueueStatus v : allValues) {
|
||||
AddressQueueStatus[] allValues = AddressQueueStatus.values();
|
||||
values = new AddressQueueStatus[allValues.length];
|
||||
for (AddressQueueStatus v : allValues) {
|
||||
values[v.id] = v;
|
||||
}
|
||||
}
|
||||
|
||||
public static QueueStatus fromID(short id) {
|
||||
public static AddressQueueStatus fromID(short id) {
|
||||
if (id < 0 || id >= values.length) {
|
||||
return null;
|
||||
} else {
|
||||
return values[id];
|
||||
}
|
||||
return values[id];
|
||||
}
|
||||
}
|
|
@ -311,10 +311,14 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
|||
* @return the id of the journal
|
||||
* @throws Exception
|
||||
*/
|
||||
long storeQueueStatus(long queueID, QueueStatus status) throws Exception;
|
||||
long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception;
|
||||
|
||||
void deleteQueueStatus(long recordID) throws Exception;
|
||||
|
||||
long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception;
|
||||
|
||||
void deleteAddressStatus(long recordID) throws Exception;
|
||||
|
||||
void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception;
|
||||
|
||||
void deleteAddressBinding(long tx, long addressBindingID) throws Exception;
|
||||
|
|
|
@ -69,11 +69,12 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
|
|||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AddressStatusEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeleteEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
|
||||
|
@ -1322,7 +1323,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
}
|
||||
|
||||
@Override
|
||||
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
|
||||
public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception {
|
||||
long recordID = idGenerator.generateID();
|
||||
|
||||
readLock();
|
||||
|
@ -1346,6 +1347,31 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception {
|
||||
long recordID = idGenerator.generateID();
|
||||
|
||||
readLock();
|
||||
try {
|
||||
bindingsJournal.appendAddRecord(recordID, JournalRecordIds.ADDRESS_STATUS_RECORD, new AddressStatusEncoding(addressID, status), true);
|
||||
} finally {
|
||||
readUnLock();
|
||||
}
|
||||
|
||||
|
||||
return recordID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAddressStatus(long recordID) throws Exception {
|
||||
readLock();
|
||||
try {
|
||||
bindingsJournal.appendDeleteRecord(recordID, true);
|
||||
} finally {
|
||||
readUnLock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
|
||||
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(),
|
||||
|
@ -1465,6 +1491,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null);
|
||||
|
||||
HashMap<Long, PersistentQueueBindingEncoding> mapBindings = new HashMap<>();
|
||||
HashMap<Long, PersistentAddressBindingEncoding> mapAddressBindings = new HashMap<>();
|
||||
|
||||
for (RecordInfo record : records) {
|
||||
long id = record.id;
|
||||
|
@ -1481,6 +1508,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
} else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) {
|
||||
PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer);
|
||||
addressBindingInfos.add(bindingEncoding);
|
||||
mapAddressBindings.put(id, bindingEncoding);
|
||||
} else if (rec == JournalRecordIds.GROUP_RECORD) {
|
||||
GroupingEncoding encoding = newGroupEncoding(id, buffer);
|
||||
groupingInfos.add(encoding);
|
||||
|
@ -1500,6 +1528,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
ActiveMQServerLogger.LOGGER.infoNoQueueWithID(statusEncoding.queueID, statusEncoding.getId());
|
||||
this.deleteQueueStatus(statusEncoding.getId());
|
||||
}
|
||||
} else if (rec == JournalRecordIds.ADDRESS_STATUS_RECORD) {
|
||||
AddressStatusEncoding statusEncoding = newAddressStatusEncoding(id, buffer);
|
||||
PersistentAddressBindingEncoding addressBindingEncoding = mapAddressBindings.get(statusEncoding.getAddressId());
|
||||
if (addressBindingEncoding != null) {
|
||||
addressBindingEncoding.setAddressStatusEncoding(statusEncoding);
|
||||
} else {
|
||||
// unlikely to happen, so I didn't bother about the Logger method
|
||||
ActiveMQServerLogger.LOGGER.infoNoAddressWithID(statusEncoding.getAddressId(), statusEncoding.getId());
|
||||
this.deleteAddressStatus(statusEncoding.getId());
|
||||
}
|
||||
} else {
|
||||
// unlikely to happen
|
||||
ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new Exception("invalid record type " + rec));
|
||||
|
@ -1962,6 +2000,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
return setting;
|
||||
}
|
||||
|
||||
static AddressStatusEncoding newAddressStatusEncoding(long id, ActiveMQBuffer buffer) {
|
||||
AddressStatusEncoding addressStatus = new AddressStatusEncoding();
|
||||
addressStatus.decode(buffer);
|
||||
addressStatus.setId(id);
|
||||
return addressStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param id
|
||||
* @param buffer
|
||||
|
|
|
@ -72,6 +72,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
|
|||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_STATUS_RECORD;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
|
||||
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE;
|
||||
|
@ -656,6 +657,9 @@ public final class DescribeJournal {
|
|||
case ADDRESS_BINDING_RECORD:
|
||||
return AbstractJournalStorageManager.newAddressBindingEncoding(id, buffer);
|
||||
|
||||
case ADDRESS_STATUS_RECORD:
|
||||
return AbstractJournalStorageManager.newAddressStatusEncoding(id, buffer);
|
||||
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -88,4 +88,6 @@ public final class JournalRecordIds {
|
|||
|
||||
public static final byte ADD_MESSAGE_PROTOCOL = 45;
|
||||
|
||||
public static final byte ADDRESS_STATUS_RECORD = 46;
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Copyright 2019 The Apache Software Foundation.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class AddressStatusEncoding implements EncodingSupport {
|
||||
|
||||
private AddressQueueStatus status;
|
||||
|
||||
private long addressId;
|
||||
|
||||
private long id;
|
||||
|
||||
public AddressStatusEncoding(long addressId, AddressQueueStatus status) {
|
||||
this.status = status;
|
||||
this.addressId = addressId;
|
||||
}
|
||||
|
||||
public AddressStatusEncoding() {
|
||||
}
|
||||
|
||||
public AddressQueueStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(AddressQueueStatus status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public long getAddressId() {
|
||||
return addressId;
|
||||
}
|
||||
|
||||
public void setAddressId(long addressId) {
|
||||
this.addressId = addressId;
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return DataConstants.SIZE_LONG + DataConstants.SIZE_SHORT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer) {
|
||||
buffer.writeLong(addressId);
|
||||
buffer.writeShort(status.id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer) {
|
||||
this.addressId = buffer.readLong();
|
||||
short shortStatus = buffer.readShort();
|
||||
this.status = AddressQueueStatus.fromID(shortStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AddressStatusEncoding{" + "status=" + status + ", id=" + addressId + '}';
|
||||
}
|
||||
|
||||
}
|
|
@ -32,6 +32,7 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
|
|||
public SimpleString name;
|
||||
|
||||
public boolean autoCreated;
|
||||
public AddressStatusEncoding addressStatusEncoding;
|
||||
|
||||
public EnumSet<RoutingType> routingTypes;
|
||||
|
||||
|
@ -82,6 +83,15 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
|
|||
return routingTypes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddressStatusEncoding getAddressStatusEncoding() {
|
||||
return addressStatusEncoding;
|
||||
}
|
||||
|
||||
public void setAddressStatusEncoding(AddressStatusEncoding addressStatusEncoding) {
|
||||
this.addressStatusEncoding = addressStatusEncoding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(final ActiveMQBuffer buffer) {
|
||||
name = buffer.readSimpleString();
|
||||
|
|
|
@ -18,16 +18,16 @@
|
|||
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class QueueStatusEncoding extends QueueEncoding {
|
||||
|
||||
private QueueStatus status;
|
||||
private AddressQueueStatus status;
|
||||
|
||||
private long id;
|
||||
|
||||
public QueueStatusEncoding(long queueID, QueueStatus status) {
|
||||
public QueueStatusEncoding(long queueID, AddressQueueStatus status) {
|
||||
super(queueID);
|
||||
this.status = status;
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ public class QueueStatusEncoding extends QueueEncoding {
|
|||
public void decode(final ActiveMQBuffer buffer) {
|
||||
super.decode(buffer);
|
||||
short shortStatus = buffer.readShort();
|
||||
this.status = QueueStatus.fromID(shortStatus);
|
||||
this.status = AddressQueueStatus.fromID(shortStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -49,7 +49,7 @@ public class QueueStatusEncoding extends QueueEncoding {
|
|||
buffer.writeShort(status.id);
|
||||
}
|
||||
|
||||
public QueueStatus getStatus() {
|
||||
public AddressQueueStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
|
|||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
|
||||
|
@ -88,7 +88,7 @@ public class NullStorageManager implements StorageManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
|
||||
public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -101,6 +101,16 @@ public class NullStorageManager implements StorageManager {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAddressStatus(long recordID) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
|
||||
|
||||
|
|
|
@ -430,6 +430,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
@Message(id = 221080, value = "Deploying address {0} supporting {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void deployAddress(String addressName, String routingTypes);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221081, value = "There is no address with ID {0}, deleting record {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void infoNoAddressWithID(Long id, Long record);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222000, value = "ActiveMQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
|
|
|
@ -24,6 +24,12 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.management.AddressControl;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames;
|
||||
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
|
@ -32,6 +38,7 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
|
|||
public class AddressInfo {
|
||||
|
||||
private long id;
|
||||
private long pauseStatusRecord = -1;
|
||||
|
||||
private final SimpleString name;
|
||||
|
||||
|
@ -53,6 +60,11 @@ public class AddressInfo {
|
|||
|
||||
private long bindingRemovedTimestamp = -1;
|
||||
|
||||
private volatile boolean paused = false;
|
||||
|
||||
private PostOffice postOffice;
|
||||
private StorageManager storageManager;
|
||||
|
||||
public AddressInfo(SimpleString name) {
|
||||
this(name, EnumSet.noneOf(RoutingType.class));
|
||||
}
|
||||
|
@ -136,21 +148,94 @@ public class AddressInfo {
|
|||
this.bindingRemovedTimestamp = bindingRemovedTimestamp;
|
||||
}
|
||||
|
||||
public synchronized void pause(boolean persist) {
|
||||
if (postOffice == null) {
|
||||
throw new IllegalStateException("");
|
||||
}
|
||||
if (storageManager == null && persist) {
|
||||
throw new IllegalStateException("");
|
||||
}
|
||||
if (this.paused) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (persist) {
|
||||
this.pauseStatusRecord = storageManager.storeAddressStatus(this.getId(), AddressQueueStatus.PAUSED);
|
||||
}
|
||||
Bindings bindings = postOffice.lookupBindingsForAddress(this.getName());
|
||||
if (bindings != null) {
|
||||
for (Binding binding : bindings.getBindings()) {
|
||||
if (binding instanceof QueueBinding) {
|
||||
((QueueBinding) binding).getQueue().pause(persist);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.paused = true;
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void resume() {
|
||||
if (postOffice == null) {
|
||||
throw new IllegalStateException("");
|
||||
}
|
||||
if (storageManager == null && this.pauseStatusRecord > 0) {
|
||||
throw new IllegalStateException("");
|
||||
}
|
||||
if (!this.paused) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (this.pauseStatusRecord > 0) {
|
||||
storageManager.deleteAddressStatus(this.pauseStatusRecord);
|
||||
}
|
||||
Bindings bindings = postOffice.lookupBindingsForAddress(this.getName());
|
||||
if (bindings != null) {
|
||||
for (Binding binding : bindings.getBindings()) {
|
||||
if (binding instanceof QueueBinding) {
|
||||
((QueueBinding) binding).getQueue().resume();
|
||||
}
|
||||
}
|
||||
}
|
||||
this.paused = false;
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
boolean isPersisted() {
|
||||
return this.paused && this.pauseStatusRecord > 0;
|
||||
}
|
||||
|
||||
public boolean isPaused() {
|
||||
return this.paused;
|
||||
}
|
||||
|
||||
public void setPostOffice(PostOffice postOffice) {
|
||||
this.postOffice = postOffice;
|
||||
}
|
||||
|
||||
public void setStorageManager(StorageManager storageManager) {
|
||||
this.storageManager = storageManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buff = new StringBuffer();
|
||||
buff.append("Address [name=" + name);
|
||||
buff.append(", id=" + id);
|
||||
StringBuilder buff = new StringBuilder();
|
||||
buff.append("Address [name=").append(name);
|
||||
buff.append(", id=").append(id);
|
||||
buff.append(", routingTypes={");
|
||||
for (RoutingType routingType : getRoutingTypes()) {
|
||||
buff.append(routingType.toString() + ",");
|
||||
buff.append(routingType.toString()).append(",");
|
||||
}
|
||||
// delete hanging comma
|
||||
if (buff.charAt(buff.length() - 1) == ',') {
|
||||
buff.deleteCharAt(buff.length() - 1);
|
||||
}
|
||||
buff.append("}");
|
||||
buff.append(", autoCreated=" + autoCreated);
|
||||
buff.append(", autoCreated=").append(autoCreated);
|
||||
buff.append(", paused=").append(paused);
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
@ -166,6 +251,9 @@ public class AddressInfo {
|
|||
public AddressInfo create(SimpleString name, RoutingType routingType) {
|
||||
AddressInfo info = new AddressInfo(name, routingType);
|
||||
info.setInternal(this.internal);
|
||||
if (paused) {
|
||||
info.pause(this.pauseStatusRecord > 0);
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.paging.impl.Page;
|
|||
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord;
|
||||
|
@ -170,7 +170,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
|
||||
if (queueBindingInfo.getQueueStatusEncodings() != null) {
|
||||
for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
|
||||
if (encoding.getStatus() == QueueStatus.PAUSED)
|
||||
if (encoding.getStatus() == AddressQueueStatus.PAUSED)
|
||||
queue.reloadPause(encoding.getId());
|
||||
}
|
||||
}
|
||||
|
@ -193,6 +193,11 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
|
||||
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes());
|
||||
addressInfo.setId(addressBindingInfo.getId());
|
||||
if (addressBindingInfo.getAddressStatusEncoding() != null && addressBindingInfo.getAddressStatusEncoding().getStatus() == AddressQueueStatus.PAUSED) {
|
||||
addressInfo.setStorageManager(storageManager);
|
||||
addressInfo.setPostOffice(postOffice);
|
||||
addressInfo.pause(true);
|
||||
}
|
||||
postOffice.reloadAddressInfo(addressInfo);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
|||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
|
@ -578,6 +578,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
this.factory = factory;
|
||||
|
||||
registerMeters();
|
||||
if (this.addressInfo != null && this.addressInfo.isPaused()) {
|
||||
this.pause(this.addressInfo.isPersisted());
|
||||
}
|
||||
}
|
||||
|
||||
// Bindable implementation -------------------------------------------------------------------------------------
|
||||
|
@ -2368,7 +2371,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
if (pauseStatusRecord >= 0) {
|
||||
storageManager.deleteQueueStatus(pauseStatusRecord);
|
||||
}
|
||||
pauseStatusRecord = storageManager.storeQueueStatus(this.id, QueueStatus.PAUSED);
|
||||
pauseStatusRecord = storageManager.storeQueueStatus(this.id, AddressQueueStatus.PAUSED);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.unableToPauseQueue(e);
|
||||
|
|
|
@ -40,10 +40,10 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
|
|||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
|
||||
|
@ -251,7 +251,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
|
||||
public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -713,5 +713,15 @@ public class TransactionImplTest extends ActiveMQTestBase {
|
|||
public long getCurrentID() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAddressStatus(long recordID) throws Exception {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -135,6 +135,13 @@ Individual addresses can be managed using the `AddressControl` interface.
|
|||
`removeRole()` methods. You can list all the roles associated to the queue with
|
||||
the `getRoles()` method
|
||||
|
||||
- Pausing and resuming Address
|
||||
|
||||
The `AddressControl` can pause and resume an address and all the queues that
|
||||
are bound to it. Newly added queue will be paused too until the address is resumed.
|
||||
Thus all messages sent to the address will be recived but not delivered. When it is
|
||||
resumed, delivering will occur again.
|
||||
|
||||
#### Queue Management
|
||||
|
||||
The bulk of the management API deals with queues. The `QueueControl` interface
|
||||
|
|
|
@ -35,6 +35,7 @@ public class GroovyRun {
|
|||
public static final String TWO_FOUR = "ARTEMIS-240";
|
||||
public static final String TWO_SIX_THREE = "ARTEMIS-263";
|
||||
public static final String TWO_SEVEN_ZERO = "ARTEMIS-270";
|
||||
public static final String TWO_NINE_ZERO = "ARTEMIS-290";
|
||||
public static final String HORNETQ_235 = "HORNETQ-235";
|
||||
public static final String HORNETQ_247 = "HORNETQ-247";
|
||||
|
||||
|
@ -115,7 +116,7 @@ public class GroovyRun {
|
|||
}
|
||||
|
||||
public static void assertEquals(Object value1, Object value2) {
|
||||
if (!value1.equals(value2)) {
|
||||
if ((value1 == null && value2 == null) || !value1.equals(value2)) {
|
||||
throw new RuntimeException(value1 + "!=" + value2);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
package journalcompatibility
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.AddressControl
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
String serverType = arg[0];
|
||||
AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic");
|
||||
GroovyRun.assertNotNull(addressControl)
|
||||
GroovyRun.assertTrue(addressControl.isPaused())
|
|
@ -0,0 +1,26 @@
|
|||
package journalcompatibility
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.AddressControl
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
String serverType = arg[0];
|
||||
AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic");
|
||||
GroovyRun.assertNotNull(addressControl)
|
||||
GroovyRun.assertTrue(!addressControl.isPaused())
|
|
@ -0,0 +1,26 @@
|
|||
package journalcompatibility
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.AddressControl
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic");
|
||||
GroovyRun.assertTrue(!addressControl.isPaused())
|
||||
addressControl.pause(true)
|
||||
GroovyRun.assertTrue(addressControl.isPaused())
|
|
@ -0,0 +1,25 @@
|
|||
package journalcompatibility
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.core.server.Queue
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
|
||||
queue.pause(true);
|
|
@ -0,0 +1,25 @@
|
|||
package journalcompatibility
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.core.server.Queue
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
|
||||
GroovyRun.assertTrue(queue.isPaused())
|
|
@ -0,0 +1,25 @@
|
|||
package journalcompatibility
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.core.server.Queue
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
|
||||
GroovyRun.assertTrue(!queue.isPaused())
|
|
@ -0,0 +1,48 @@
|
|||
package journalcompatibility
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
import org.apache.activemq.artemis.api.core.management.AddressControl
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo
|
||||
|
||||
import javax.jms.*
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
try {
|
||||
cf.setEnable1xPrefixes(true);
|
||||
} catch (Throwable totallyIgnored) {
|
||||
// older versions will not have this method, dont even bother about seeing the stack trace or exception
|
||||
}
|
||||
Connection connection = cf.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTopic("jms.topic.MyTopic");
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
MessageConsumer messageConsumer1 = session.createConsumer(topic);
|
||||
MessageConsumer messageConsumer2 = session.createConsumer(topic);
|
||||
TextMessage message = session.createTextMessage("This is a text message");
|
||||
System.out.println("Sent message: " + message.getText());
|
||||
producer.send(message);
|
||||
connection.start();
|
||||
TextMessage messageReceived = (TextMessage) messageConsumer1.receive(5000);
|
||||
GroovyRun.assertNotNull(messageReceived);
|
||||
System.out.println("Consumer 1 Received message: " + messageReceived.getText());
|
||||
messageReceived = (TextMessage) messageConsumer2.receive(5000);
|
||||
GroovyRun.assertNotNull(messageReceived);
|
||||
System.out.println("Consumer 2 Received message: " + messageReceived.getText());
|
|
@ -0,0 +1,26 @@
|
|||
package journalcompatibility
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.AddressControl
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic");
|
||||
GroovyRun.assertTrue(addressControl.isPaused())
|
||||
addressControl.resume()
|
||||
GroovyRun.assertTrue(!addressControl.isPaused())
|
|
@ -0,0 +1,25 @@
|
|||
package journalcompatibility
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.core.server.Queue
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
|
||||
queue.resume();
|
|
@ -0,0 +1,82 @@
|
|||
package journalcompatibility
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
import org.apache.activemq.artemis.api.core.management.AddressControl
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo
|
||||
|
||||
import javax.jms.*
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
try {
|
||||
cf.setEnable1xPrefixes(true);
|
||||
} catch (Throwable totallyIgnored) {
|
||||
// older versions will not have this method, dont even bother about seeing the stack trace or exception
|
||||
}
|
||||
Connection connection = cf.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTopic("jms.topic.MyTopic");
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
MessageConsumer messageConsumer1 = session.createConsumer(topic);
|
||||
MessageConsumer messageConsumer2 = session.createConsumer(topic);
|
||||
TextMessage message = session.createTextMessage("This is a text message");
|
||||
System.out.println("Sent message: " + message.getText());
|
||||
AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic");
|
||||
addressControl.pause(true);
|
||||
producer.send(message);
|
||||
connection.start();
|
||||
TextMessage messageReceived = (TextMessage) messageConsumer1.receive(5000);
|
||||
GroovyRun.assertNull(messageReceived);
|
||||
messageReceived = (TextMessage) messageConsumer2.receive(5000);
|
||||
GroovyRun.assertNull(messageReceived);
|
||||
|
||||
//
|
||||
//
|
||||
//
|
||||
//ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
//try {
|
||||
// cf.setEnable1xPrefixes(true);
|
||||
//} catch (Throwable totallyIgnored) {
|
||||
// // older versions will not have this method, dont even bother about seeing the stack trace or exception
|
||||
//}
|
||||
//Connection connection = cf.createConnection();
|
||||
//connection.setClientID("myClientID");
|
||||
//Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
//connection.start();
|
||||
//
|
||||
//Topic topic = session.createTopic("jms.topic.MyTopic");
|
||||
//TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1");
|
||||
//
|
||||
//MessageProducer producer = session.createProducer(topic);
|
||||
//producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
//
|
||||
//AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic");
|
||||
//
|
||||
//addressControl.pause(true)
|
||||
//for (int i = 0; i < 100; i++) {
|
||||
// TextMessage mess = session.createTextMessage("msg" + i);
|
||||
// producer.send(mess);
|
||||
//}
|
||||
//for (int i = 0; i < 100; i++) {
|
||||
// TextMessage m = (TextMessage) subscriber1.receive(500)
|
||||
// GroovyRun.assertNull(m)
|
||||
//}
|
||||
//session.close();
|
||||
//connection.close();
|
|
@ -0,0 +1,145 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.compatibility;
|
||||
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_NINE_ZERO;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
|
||||
import org.apache.activemq.artemis.utils.FileUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
/**
|
||||
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
|
||||
*
|
||||
* cd /compatibility-tests
|
||||
* mvn install -Ptests | tee output.log
|
||||
*
|
||||
* on the output.log you will see the output generated by {@link #getClasspath(String)}
|
||||
*
|
||||
* On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test.
|
||||
* On Idea you would do the following:
|
||||
*
|
||||
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class AddressPauseJournalCompatibilityTest extends VersionedBase {
|
||||
|
||||
// this will ensure that all tests in this class are run twice,
|
||||
// once with "true" passed to the class' constructor and once with "false"
|
||||
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
|
||||
public static Collection getParameters() {
|
||||
// we don't need every single version ever released..
|
||||
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
|
||||
List<Object[]> combinations = new ArrayList<>();
|
||||
|
||||
/*
|
||||
// during development sometimes is useful to comment out the combinations
|
||||
// and add the ones you are interested.. example:
|
||||
*/
|
||||
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
|
||||
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
|
||||
|
||||
combinations.add(new Object[]{null, TWO_NINE_ZERO, SNAPSHOT});
|
||||
combinations.add(new Object[]{null, SNAPSHOT, TWO_NINE_ZERO});
|
||||
// the purpose on this one is just to validate the test itself.
|
||||
/// if it can't run against itself it won't work at all
|
||||
combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT});
|
||||
return combinations;
|
||||
}
|
||||
|
||||
public AddressPauseJournalCompatibilityTest(String server, String sender, String receiver) throws Exception {
|
||||
super(server, sender, receiver);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void removeFolder() throws Throwable {
|
||||
FileUtil.deleteDirectory(serverFolder.getRoot());
|
||||
serverFolder.getRoot().mkdirs();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
try {
|
||||
stopServer(serverClassloader);
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
try {
|
||||
stopServer(receiverClassloader);
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendReceive() throws Throwable {
|
||||
setVariable(senderClassloader, "persistent", true);
|
||||
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true);
|
||||
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
|
||||
evaluate(senderClassloader, "journalcompatibility/queue_isrunning.groovy");
|
||||
evaluate(senderClassloader, "journalcompatibility/pause_queue.groovy");
|
||||
evaluate(senderClassloader, "journalcompatibility/queue_ispaused.groovy");
|
||||
stopServer(senderClassloader);
|
||||
|
||||
setVariable(receiverClassloader, "persistent", true);
|
||||
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);
|
||||
|
||||
setVariable(receiverClassloader, "latch", null);
|
||||
evaluate(receiverClassloader, "journalcompatibility/queue_ispaused.groovy");
|
||||
evaluate(receiverClassloader, "journalcompatibility/resume_queue.groovy");
|
||||
evaluate(receiverClassloader, "journalcompatibility/resume_queue.groovy");
|
||||
evaluate(receiverClassloader, "journalcompatibility/queue_isrunning.groovy");
|
||||
evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendReceiveTopic() throws Throwable {
|
||||
if (TWO_NINE_ZERO.equals(this.sender)) {
|
||||
return;
|
||||
}
|
||||
setVariable(senderClassloader, "persistent", true);
|
||||
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true);
|
||||
evaluate(senderClassloader, "journalcompatibility/receiveMessagesAddress.groovy", server, sender);
|
||||
evaluate(senderClassloader, "journalcompatibility/sendMessagesAddress.groovy", server, sender);
|
||||
evaluate(senderClassloader, "journalcompatibility/address_ispaused.groovy", server);
|
||||
evaluate(senderClassloader, "journalcompatibility/resume_address.groovy");
|
||||
evaluate(senderClassloader, "journalcompatibility/address_isrunning.groovy", server);
|
||||
evaluate(senderClassloader, "journalcompatibility/pause_address.groovy");
|
||||
evaluate(senderClassloader, "journalcompatibility/address_ispaused.groovy", server);
|
||||
stopServer(senderClassloader);
|
||||
|
||||
setVariable(receiverClassloader, "persistent", true);
|
||||
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);
|
||||
stopServer(receiverClassloader);
|
||||
|
||||
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true);
|
||||
evaluate(senderClassloader, "journalcompatibility/address_ispaused.groovy", server);
|
||||
evaluate(senderClassloader, "journalcompatibility/resume_address.groovy");
|
||||
evaluate(senderClassloader, "journalcompatibility/address_isrunning.groovy", server);
|
||||
evaluate(senderClassloader, "journalcompatibility/receiveMessagesAddress.groovy", server, sender);
|
||||
stopServer(senderClassloader);
|
||||
}
|
||||
}
|
||||
|
|
@ -53,5 +53,24 @@ public class AddressConfigTest extends ActiveMQTestBase {
|
|||
Set<RoutingType> routingTypeSet = new HashSet<>();
|
||||
routingTypeSet.add(RoutingType.MULTICAST);
|
||||
assertEquals(routingTypeSet, addressInfo.getRoutingTypes());
|
||||
assertFalse(addressInfo.isPaused());
|
||||
|
||||
addressInfo.setPostOffice(server.getPostOffice());
|
||||
addressInfo.setStorageManager(server.getStorageManager());
|
||||
addressInfo.pause(true);
|
||||
assertTrue(addressInfo.isPaused());
|
||||
long id = addressInfo.getId();
|
||||
server.stop();
|
||||
server.start();
|
||||
addressInfo = server.getAddressInfo(SimpleString.toSimpleString("myAddress"));
|
||||
assertNotNull(addressInfo);
|
||||
routingTypeSet = new HashSet<>();
|
||||
routingTypeSet.add(RoutingType.MULTICAST);
|
||||
assertEquals(routingTypeSet, addressInfo.getRoutingTypes());
|
||||
assertEquals(id, addressInfo.getId());
|
||||
assertTrue(addressInfo.isPaused());
|
||||
addressInfo.setPostOffice(server.getPostOffice());
|
||||
addressInfo.setStorageManager(server.getStorageManager());
|
||||
addressInfo.resume();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Copyright 2019 The Apache Software Foundation.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import org.apache.activemq.artemis.api.core.management.AddressControl;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AddressPauseTest extends JMSTestBase {
|
||||
|
||||
@Test
|
||||
public void testPauseAddress() throws Exception {
|
||||
try (Connection connection = cf.createConnection()) {
|
||||
connection.setClientID("myClientID");
|
||||
connection.start();
|
||||
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
|
||||
Topic topic = session.createTopic("jms.topic.MyTopic");
|
||||
TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1");
|
||||
AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic");
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
final int numMessages = 100;
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
TextMessage mess = session.createTextMessage("msg" + i);
|
||||
producer.send(mess);
|
||||
}
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
TextMessage m = (TextMessage) subscriber1.receive(5000);
|
||||
Assert.assertNotNull(m);
|
||||
}
|
||||
//Pausing the subscriptions
|
||||
addressControl.pause();
|
||||
Assert.assertTrue(addressControl.isPaused());
|
||||
//subscriber2 should be paused too
|
||||
TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, "my-subscription2");
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
TextMessage mess = session.createTextMessage("msg" + i);
|
||||
producer.send(mess);
|
||||
}
|
||||
TextMessage message = (TextMessage) subscriber1.receive(5000);
|
||||
Assert.assertNull(message);
|
||||
message = (TextMessage) subscriber2.receive(5000);
|
||||
Assert.assertNull(message);
|
||||
//Resuming the subscriptions
|
||||
addressControl.resume();
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
TextMessage m = (TextMessage) subscriber1.receive(5000);
|
||||
Assert.assertNotNull(m);
|
||||
}
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
TextMessage m = (TextMessage) subscriber2.receive(5000);
|
||||
Assert.assertNotNull(m);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -55,7 +55,7 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
|
|||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
|
||||
|
@ -624,7 +624,7 @@ public class SendAckFailTest extends SpawnedTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
|
||||
public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception {
|
||||
return manager.storeQueueStatus(queueID, status);
|
||||
}
|
||||
|
||||
|
@ -633,6 +633,17 @@ public class SendAckFailTest extends SpawnedTestBase {
|
|||
manager.deleteQueueStatus(recordID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception {
|
||||
return manager.storeAddressStatus(addressID, status);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void deleteAddressStatus(long recordID) throws Exception {
|
||||
manager.deleteAddressStatus(recordID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception {
|
||||
manager.addAddressBinding(tx, addressInfo);
|
||||
|
|
|
@ -113,6 +113,26 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
|
|||
return (long) proxy.retrieveAttributeValue("unRoutedMessageCount");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause() throws Exception {
|
||||
proxy.invokeOperation("pause");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause(boolean persist) throws Exception {
|
||||
proxy.invokeOperation("pause", persist);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() throws Exception {
|
||||
proxy.invokeOperation("resume");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPaused() {
|
||||
return (boolean) proxy.retrieveAttributeValue("paused");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String sendMessage(Map<String, String> headers,
|
||||
int type,
|
||||
|
|
Loading…
Reference in New Issue