This commit is contained in:
Clebert Suconic 2019-07-19 08:52:28 -04:00
commit 5107eb9f91
30 changed files with 1234 additions and 67 deletions

View File

@ -135,4 +135,31 @@ public interface AddressControl {
@Parameter(name = "durable", desc = "Whether the message is durable") boolean durable,
@Parameter(name = "user", desc = "The user to authenticate with") String user,
@Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception;
/**
* Pauses all the queues bound to this address.Messages are no longer delivered to all its bounded queues.
* Newly added queue will be paused too until resume is called.
* @throws java.lang.Exception
*/
@Operation(desc = "Pauses the queues bound to this address", impact = MBeanOperationInfo.ACTION)
void pause() throws Exception;
/**
* Pauses all the queues bound to this address.Messages are no longer delivered to all its bounded queues.Newly added queue will be paused too until resume is called.
* @param persist if true, the pause state will be persisted.
* @throws java.lang.Exception
*/
@Operation(desc = "Pauses the queues bound to this address", impact = MBeanOperationInfo.ACTION)
void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception;
/**
* Resume all the queues bound of this address.Messages are delivered again to all its bounded queues.
* @throws java.lang.Exception
*/
@Operation(desc = "Resumes the queues bound to this address", impact = MBeanOperationInfo.ACTION)
void resume() throws Exception;
@Attribute(desc = "indicates if the queues bound to this address are paused")
boolean isPaused();
}

View File

@ -132,8 +132,6 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
if (AuditLogger.isEnabled()) {
AuditLogger.getQueueNames(this.addressInfo);
}
String[] result;
clearIO();
try {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
@ -373,6 +371,54 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
return MBeanInfoHelper.getMBeanAttributesInfo(AddressControl.class);
}
@Override
public void pause() {
pause(false);
}
@Override
public void pause(boolean persist) {
if (AuditLogger.isEnabled()) {
AuditLogger.pause(addressInfo);
}
checkStarted();
clearIO();
try {
addressInfo.setPostOffice(server.getPostOffice());
addressInfo.setStorageManager(server.getStorageManager());
addressInfo.pause(persist);
} finally {
blockOnIO();
}
}
@Override
public void resume() {
if (AuditLogger.isEnabled()) {
AuditLogger.resume(addressInfo);
}
checkStarted();
clearIO();
try {
addressInfo.setPostOffice(server.getPostOffice());
addressInfo.setStorageManager(server.getStorageManager());
addressInfo.resume();
} finally {
blockOnIO();
}
}
@Override
public boolean isPaused() {
if (AuditLogger.isEnabled()) {
AuditLogger.isPaused(this.addressInfo);
}
return addressInfo.isPaused();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@ -409,6 +455,12 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
}
}
private void checkStarted() {
if (!server.getPostOffice().isStarted()) {
throw new IllegalStateException("Broker is not started. Queues can not be managed yet");
}
}
// Inner classes -------------------------------------------------
private enum DurabilityType {

View File

@ -20,6 +20,7 @@ import java.util.EnumSet;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AddressStatusEncoding;
public interface AddressBindingInfo {
@ -28,4 +29,6 @@ public interface AddressBindingInfo {
SimpleString getName();
EnumSet<RoutingType> getRoutingTypes();
AddressStatusEncoding getAddressStatusEncoding();
}

View File

@ -17,30 +17,29 @@
package org.apache.activemq.artemis.core.persistence;
public enum QueueStatus {
public enum AddressQueueStatus {
PAUSED((short) 0), RUNNING((short) 1);
public final short id;
QueueStatus(short id) {
AddressQueueStatus(short id) {
this.id = id;
}
public static QueueStatus[] values;
public static AddressQueueStatus[] values;
static {
QueueStatus[] allValues = QueueStatus.values();
values = new QueueStatus[allValues.length];
for (QueueStatus v : allValues) {
AddressQueueStatus[] allValues = AddressQueueStatus.values();
values = new AddressQueueStatus[allValues.length];
for (AddressQueueStatus v : allValues) {
values[v.id] = v;
}
}
public static QueueStatus fromID(short id) {
public static AddressQueueStatus fromID(short id) {
if (id < 0 || id >= values.length) {
return null;
} else {
return values[id];
}
return values[id];
}
}

View File

@ -311,10 +311,14 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
* @return the id of the journal
* @throws Exception
*/
long storeQueueStatus(long queueID, QueueStatus status) throws Exception;
long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception;
void deleteQueueStatus(long recordID) throws Exception;
long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception;
void deleteAddressStatus(long recordID) throws Exception;
void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception;
void deleteAddressBinding(long tx, long addressBindingID) throws Exception;

View File

@ -69,11 +69,12 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AddressStatusEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeleteEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
@ -1322,7 +1323,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
@Override
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception {
long recordID = idGenerator.generateID();
readLock();
@ -1346,6 +1347,31 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
}
@Override
public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception {
long recordID = idGenerator.generateID();
readLock();
try {
bindingsJournal.appendAddRecord(recordID, JournalRecordIds.ADDRESS_STATUS_RECORD, new AddressStatusEncoding(addressID, status), true);
} finally {
readUnLock();
}
return recordID;
}
@Override
public void deleteAddressStatus(long recordID) throws Exception {
readLock();
try {
bindingsJournal.appendDeleteRecord(recordID, true);
} finally {
readUnLock();
}
}
@Override
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(),
@ -1465,6 +1491,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null);
HashMap<Long, PersistentQueueBindingEncoding> mapBindings = new HashMap<>();
HashMap<Long, PersistentAddressBindingEncoding> mapAddressBindings = new HashMap<>();
for (RecordInfo record : records) {
long id = record.id;
@ -1481,6 +1508,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
} else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) {
PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer);
addressBindingInfos.add(bindingEncoding);
mapAddressBindings.put(id, bindingEncoding);
} else if (rec == JournalRecordIds.GROUP_RECORD) {
GroupingEncoding encoding = newGroupEncoding(id, buffer);
groupingInfos.add(encoding);
@ -1500,6 +1528,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
ActiveMQServerLogger.LOGGER.infoNoQueueWithID(statusEncoding.queueID, statusEncoding.getId());
this.deleteQueueStatus(statusEncoding.getId());
}
} else if (rec == JournalRecordIds.ADDRESS_STATUS_RECORD) {
AddressStatusEncoding statusEncoding = newAddressStatusEncoding(id, buffer);
PersistentAddressBindingEncoding addressBindingEncoding = mapAddressBindings.get(statusEncoding.getAddressId());
if (addressBindingEncoding != null) {
addressBindingEncoding.setAddressStatusEncoding(statusEncoding);
} else {
// unlikely to happen, so I didn't bother about the Logger method
ActiveMQServerLogger.LOGGER.infoNoAddressWithID(statusEncoding.getAddressId(), statusEncoding.getId());
this.deleteAddressStatus(statusEncoding.getId());
}
} else {
// unlikely to happen
ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new Exception("invalid record type " + rec));
@ -1962,6 +2000,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return setting;
}
static AddressStatusEncoding newAddressStatusEncoding(long id, ActiveMQBuffer buffer) {
AddressStatusEncoding addressStatus = new AddressStatusEncoding();
addressStatus.decode(buffer);
addressStatus.setId(id);
return addressStatus;
}
/**
* @param id
* @param buffer

View File

@ -72,6 +72,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_STATUS_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE;
@ -656,6 +657,9 @@ public final class DescribeJournal {
case ADDRESS_BINDING_RECORD:
return AbstractJournalStorageManager.newAddressBindingEncoding(id, buffer);
case ADDRESS_STATUS_RECORD:
return AbstractJournalStorageManager.newAddressStatusEncoding(id, buffer);
default:
return null;
}

View File

@ -88,4 +88,6 @@ public final class JournalRecordIds {
public static final byte ADD_MESSAGE_PROTOCOL = 45;
public static final byte ADDRESS_STATUS_RECORD = 46;
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2019 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.utils.DataConstants;
public class AddressStatusEncoding implements EncodingSupport {
private AddressQueueStatus status;
private long addressId;
private long id;
public AddressStatusEncoding(long addressId, AddressQueueStatus status) {
this.status = status;
this.addressId = addressId;
}
public AddressStatusEncoding() {
}
public AddressQueueStatus getStatus() {
return status;
}
public void setStatus(AddressQueueStatus status) {
this.status = status;
}
public long getAddressId() {
return addressId;
}
public void setAddressId(long addressId) {
this.addressId = addressId;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
@Override
public int getEncodeSize() {
return DataConstants.SIZE_LONG + DataConstants.SIZE_SHORT;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(addressId);
buffer.writeShort(status.id);
}
@Override
public void decode(ActiveMQBuffer buffer) {
this.addressId = buffer.readLong();
short shortStatus = buffer.readShort();
this.status = AddressQueueStatus.fromID(shortStatus);
}
@Override
public String toString() {
return "AddressStatusEncoding{" + "status=" + status + ", id=" + addressId + '}';
}
}

View File

@ -32,6 +32,7 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
public SimpleString name;
public boolean autoCreated;
public AddressStatusEncoding addressStatusEncoding;
public EnumSet<RoutingType> routingTypes;
@ -82,6 +83,15 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
return routingTypes;
}
@Override
public AddressStatusEncoding getAddressStatusEncoding() {
return addressStatusEncoding;
}
public void setAddressStatusEncoding(AddressStatusEncoding addressStatusEncoding) {
this.addressStatusEncoding = addressStatusEncoding;
}
@Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();

View File

@ -18,16 +18,16 @@
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.utils.DataConstants;
public class QueueStatusEncoding extends QueueEncoding {
private QueueStatus status;
private AddressQueueStatus status;
private long id;
public QueueStatusEncoding(long queueID, QueueStatus status) {
public QueueStatusEncoding(long queueID, AddressQueueStatus status) {
super(queueID);
this.status = status;
}
@ -40,7 +40,7 @@ public class QueueStatusEncoding extends QueueEncoding {
public void decode(final ActiveMQBuffer buffer) {
super.decode(buffer);
short shortStatus = buffer.readShort();
this.status = QueueStatus.fromID(shortStatus);
this.status = AddressQueueStatus.fromID(shortStatus);
}
@Override
@ -49,7 +49,7 @@ public class QueueStatusEncoding extends QueueEncoding {
buffer.writeShort(status.id);
}
public QueueStatus getStatus() {
public AddressQueueStatus getStatus() {
return status;
}

View File

@ -43,7 +43,7 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
@ -88,7 +88,7 @@ public class NullStorageManager implements StorageManager {
}
@Override
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception {
return 0;
}
@ -101,6 +101,16 @@ public class NullStorageManager implements StorageManager {
}
@Override
public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception {
return 0;
}
@Override
public void deleteAddressStatus(long recordID) throws Exception {
}
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {

View File

@ -430,6 +430,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 221080, value = "Deploying address {0} supporting {1}", format = Message.Format.MESSAGE_FORMAT)
void deployAddress(String addressName, String routingTypes);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221081, value = "There is no address with ID {0}, deleting record {1}", format = Message.Format.MESSAGE_FORMAT)
void infoNoAddressWithID(Long id, Long record);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222000, value = "ActiveMQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope",
format = Message.Format.MESSAGE_FORMAT)

View File

@ -24,6 +24,13 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.utils.CompositeAddress;
@ -32,6 +39,7 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
public class AddressInfo {
private long id;
private long pauseStatusRecord = -1;
private final SimpleString name;
@ -53,6 +61,11 @@ public class AddressInfo {
private long bindingRemovedTimestamp = -1;
private volatile boolean paused = false;
private PostOffice postOffice;
private StorageManager storageManager;
public AddressInfo(SimpleString name) {
this(name, EnumSet.noneOf(RoutingType.class));
}
@ -136,21 +149,128 @@ public class AddressInfo {
this.bindingRemovedTimestamp = bindingRemovedTimestamp;
}
public synchronized void reloadPause(long recordID) {
if (pauseStatusRecord >= 0) {
try {
storageManager.deleteAddressStatus(pauseStatusRecord);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToDeleteQueueStatus(e);
}
}
this.pauseStatusRecord = recordID;
try {
Bindings bindings = postOffice.lookupBindingsForAddress(this.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
((QueueBinding) binding).getQueue().pause(false);
}
}
}
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
this.paused = true;
}
public synchronized void pause(boolean persist) {
if (postOffice == null) {
throw new IllegalStateException("");
}
if (storageManager == null && persist) {
throw new IllegalStateException("");
}
try {
if (persist) {
if (pauseStatusRecord >= 0) {
try {
storageManager.deleteAddressStatus(pauseStatusRecord);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToDeleteQueueStatus(e);
}
}
this.pauseStatusRecord = storageManager.storeAddressStatus(this.getId(), AddressQueueStatus.PAUSED);
}
Bindings bindings = postOffice.lookupBindingsForAddress(this.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
((QueueBinding) binding).getQueue().pause(false);
}
}
}
this.paused = true;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public synchronized void resume() {
if (postOffice == null) {
throw new IllegalStateException("");
}
if (storageManager == null && this.pauseStatusRecord > 0) {
throw new IllegalStateException("");
}
if (!this.paused) {
return;
}
try {
if (this.pauseStatusRecord > 0) {
storageManager.deleteAddressStatus(this.pauseStatusRecord);
}
Bindings bindings = postOffice.lookupBindingsForAddress(this.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
((QueueBinding) binding).getQueue().resume();
}
}
}
this.paused = false;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
boolean isPersisted() {
return this.paused && this.pauseStatusRecord > 0;
}
public boolean isPaused() {
return this.paused;
}
public void setPostOffice(PostOffice postOffice) {
this.postOffice = postOffice;
}
public void setStorageManager(StorageManager storageManager) {
this.storageManager = storageManager;
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append("Address [name=" + name);
buff.append(", id=" + id);
StringBuilder buff = new StringBuilder();
buff.append("Address [name=").append(name);
buff.append(", id=").append(id);
buff.append(", routingTypes={");
for (RoutingType routingType : getRoutingTypes()) {
buff.append(routingType.toString() + ",");
buff.append(routingType.toString()).append(",");
}
// delete hanging comma
if (buff.charAt(buff.length() - 1) == ',') {
buff.deleteCharAt(buff.length() - 1);
}
buff.append("}");
buff.append(", autoCreated=" + autoCreated);
buff.append(", autoCreated=").append(autoCreated);
buff.append(", paused=").append(paused);
buff.append("]");
return buff.toString();
}
@ -166,6 +286,9 @@ public class AddressInfo {
public AddressInfo create(SimpleString name, RoutingType routingType) {
AddressInfo info = new AddressInfo(name, routingType);
info.setInternal(this.internal);
if (paused) {
info.pause(this.pauseStatusRecord > 0);
}
return info;
}

View File

@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord;
@ -170,7 +170,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
if (queueBindingInfo.getQueueStatusEncodings() != null) {
for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
if (encoding.getStatus() == QueueStatus.PAUSED)
if (encoding.getStatus() == AddressQueueStatus.PAUSED)
queue.reloadPause(encoding.getId());
}
}
@ -193,6 +193,11 @@ public class PostOfficeJournalLoader implements JournalLoader {
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes());
addressInfo.setId(addressBindingInfo.getId());
if (addressBindingInfo.getAddressStatusEncoding() != null && addressBindingInfo.getAddressStatusEncoding().getStatus() == AddressQueueStatus.PAUSED) {
addressInfo.setStorageManager(storageManager);
addressInfo.setPostOffice(postOffice);
addressInfo.reloadPause(addressBindingInfo.getAddressStatusEncoding().getId());
}
postOffice.reloadAddressInfo(addressInfo);
}
}

View File

@ -59,7 +59,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -578,6 +578,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.factory = factory;
registerMeters();
if (this.addressInfo != null && this.addressInfo.isPaused()) {
this.pause(false);
}
}
// Bindable implementation -------------------------------------------------------------------------------------
@ -2368,7 +2371,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (pauseStatusRecord >= 0) {
storageManager.deleteQueueStatus(pauseStatusRecord);
}
pauseStatusRecord = storageManager.storeQueueStatus(this.id, QueueStatus.PAUSED);
pauseStatusRecord = storageManager.storeQueueStatus(this.id, AddressQueueStatus.PAUSED);
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToPauseQueue(e);
@ -2394,7 +2397,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public synchronized boolean isPaused() {
return paused;
return paused || (addressInfo != null && addressInfo.isPaused());
}
@Override
@ -2546,7 +2549,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
synchronized (this) {
// Need to do these checks inside the synchronized
if (paused || !canDispatch() && redistributor == null) {
if (isPaused() || !canDispatch() && redistributor == null) {
return false;
}
@ -2754,7 +2757,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
depagePending = false;
synchronized (this) {
if (paused || pageIterator == null) {
if (isPaused() || pageIterator == null) {
return;
}
}
@ -3200,7 +3203,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (!supportsDirectDeliver) {
return false;
}
if (paused || !canDispatch() && redistributor == null) {
if (isPaused() || !canDispatch() && redistributor == null) {
return false;
}

View File

@ -40,10 +40,10 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
@ -251,7 +251,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception {
return 0;
}
@ -713,5 +713,15 @@ public class TransactionImplTest extends ActiveMQTestBase {
public long getCurrentID() {
return 0;
}
@Override
public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception {
return 0;
}
@Override
public void deleteAddressStatus(long recordID) throws Exception {
}
}
}

View File

@ -135,6 +135,13 @@ Individual addresses can be managed using the `AddressControl` interface.
`removeRole()` methods. You can list all the roles associated to the queue with
the `getRoles()` method
- Pausing and resuming Address
The `AddressControl` can pause and resume an address and all the queues that
are bound to it. Newly added queue will be paused too until the address is resumed.
Thus all messages sent to the address will be recived but not delivered. When it is
resumed, delivering will occur again.
#### Queue Management
The bulk of the management API deals with queues. The `QueueControl` interface

View File

@ -14,7 +14,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@ -261,8 +262,8 @@
<artifactId>jboss-logmanager</artifactId>
</dependency>
<dependency>
<groupId>org.wildfly.common</groupId>
<artifactId>wildfly-common</artifactId>
<groupId>org.wildfly.common</groupId>
<artifactId>wildfly-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
@ -325,7 +326,7 @@
<arg>org.apache.activemq:artemis-amqp-protocol:${project.version}</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:${project.version}</arg>
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
</libListWithDeps>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
@ -355,30 +356,59 @@
otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
<variableName>ARTEMIS-263</variableName>
</configuration>
</execution> <execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>270-check</id>
<configuration>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.7.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.7.0</arg>
<arg>org.apache.activemq:artemis-cli:2.7.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.7.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.7.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.7.0</arg>
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<!-- for future maintainers, notice that if you add new variables you also need to add the system property
otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
<variableName>ARTEMIS-270</variableName>
</configuration>
</execution>
</execution>
<execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>270-check</id>
<configuration>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.7.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.7.0</arg>
<arg>org.apache.activemq:artemis-cli:2.7.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.7.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.7.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.7.0</arg>
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<!-- for future maintainers, notice that if you add new variables you also need to add the system property
otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
<variableName>ARTEMIS-270</variableName>
</configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>2_10_0-check</id>
<configuration>
<!-- At the time I'm writing this 2.10.0 is not released yet
This could be removed the day 2.10.0 is released.
I am adding this now to make sure we will run tests on 2.10.0 when it is released -->
<optional>true</optional>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.10.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.10.0</arg>
<arg>org.apache.activemq:artemis-cli:2.10.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.10.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.10.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.10.0</arg>
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<!-- for future maintainers, notice that if you add new variables you also need to add the system property
otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
<variableName>ARTEMIS-2_10_0</variableName>
</configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>

View File

@ -35,6 +35,7 @@ public class GroovyRun {
public static final String TWO_FOUR = "ARTEMIS-240";
public static final String TWO_SIX_THREE = "ARTEMIS-263";
public static final String TWO_SEVEN_ZERO = "ARTEMIS-270";
public static final String TWO_TEN_ZERO = "ARTEMIS-2_10_0";
public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247";
@ -115,7 +116,7 @@ public class GroovyRun {
}
public static void assertEquals(Object value1, Object value2) {
if (!value1.equals(value2)) {
if ((value1 == null && value2 == null) || !value1.equals(value2)) {
throw new RuntimeException(value1 + "!=" + value2);
}
}

View File

@ -0,0 +1,69 @@
package addresspause
import org.apache.activemq.artemis.api.core.management.AddressControl
import org.apache.activemq.artemis.api.core.management.ResourceNames
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
String address = arg[0]
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100")
Connection connection = cf.createConnection();
connection.setClientID("myClientID");
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
Queue queue = session.createQueue("queue");
MessageConsumer consumer;
Destination destination;
if (address.equals("topic")) {
destination = topic;
TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1");
consumer = subscriber1;
} else {
destination = queue;
consumer = session.createConsumer(queue);
}
AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + address);
GroovyRun.assertNotNull(addressControl)
GroovyRun.assertTrue(addressControl.isPaused())
GroovyRun.assertNull(consumer.receiveNoWait());
int numMessages = 10;
addressControl.resume();
for (int i = 0; i < numMessages; i++) {
TextMessage m = (TextMessage) consumer.receive(5000);
GroovyRun.assertNotNull(m);
}
session.commit();
GroovyRun.assertNull(consumer.receiveNoWait());
connection.close();
GroovyRun.assertFalse(addressControl.isPaused())

View File

@ -0,0 +1,78 @@
package addresspause
import org.apache.activemq.artemis.api.core.management.AddressControl
import org.apache.activemq.artemis.api.core.management.ResourceNames
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
String address = arg[0]
AddressControl addressControl = (AddressControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.ADDRESS + address);
GroovyRun.assertNotNull(addressControl)
GroovyRun.assertFalse(addressControl.isPaused())
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100")
Connection connection = cf.createConnection();
connection.setClientID("myClientID");
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
Queue queue = session.createQueue("queue");
MessageConsumer consumer;
Destination destination;
if (address.equals("topic")) {
destination = topic;
TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1");
consumer = subscriber1;
} else {
destination = queue;
consumer = session.createConsumer(queue);
}
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
session.commit();
for (int i = 0; i < numMessages; i++) {
TextMessage m = (TextMessage) consumer.receive(5000);
GroovyRun.assertNotNull(m);
}
session.commit();
//Pausing the subscriptions
addressControl.pause(true);
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
session.commit();
GroovyRun.assertNull(consumer.receiveNoWait());
connection.close();

View File

@ -0,0 +1,70 @@
package queuepause
import org.apache.activemq.artemis.api.core.management.AddressControl
import org.apache.activemq.artemis.api.core.management.QueueControl
import org.apache.activemq.artemis.api.core.management.ResourceNames
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
String address = arg[0]
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100")
Connection connection = cf.createConnection();
connection.setClientID("myClientID");
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
Queue queue = session.createQueue("queue");
MessageConsumer consumer;
Destination destination;
if (address.equals("topic")) {
destination = topic;
TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1");
consumer = subscriber1;
} else {
destination = queue;
consumer = session.createConsumer(queue);
}
QueueControl addressControl = (QueueControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.QUEUE + address);
GroovyRun.assertNotNull(addressControl)
GroovyRun.assertTrue(addressControl.isPaused())
GroovyRun.assertNull(consumer.receiveNoWait());
int numMessages = 10;
addressControl.resume();
for (int i = 0; i < numMessages; i++) {
TextMessage m = (TextMessage) consumer.receive(5000);
GroovyRun.assertNotNull(m);
}
session.commit();
GroovyRun.assertNull(consumer.receiveNoWait());
connection.close();
GroovyRun.assertFalse(addressControl.isPaused())

View File

@ -0,0 +1,79 @@
package queuepause
import org.apache.activemq.artemis.api.core.management.AddressControl
import org.apache.activemq.artemis.api.core.management.QueueControl
import org.apache.activemq.artemis.api.core.management.ResourceNames
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
String address = arg[0]
QueueControl addressControl = (QueueControl) server.getJMSServerManager().getActiveMQServer().getManagementService().getResource(ResourceNames.QUEUE + address);
GroovyRun.assertNotNull(addressControl)
GroovyRun.assertFalse(addressControl.isPaused())
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100")
Connection connection = cf.createConnection();
connection.setClientID("myClientID");
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
Queue queue = session.createQueue("queue");
MessageConsumer consumer;
Destination destination;
if (address.equals("topic")) {
destination = topic;
TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1");
consumer = subscriber1;
} else {
destination = queue;
consumer = session.createConsumer(queue);
}
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
session.commit();
for (int i = 0; i < numMessages; i++) {
TextMessage m = (TextMessage) consumer.receive(5000);
GroovyRun.assertNotNull(m);
}
session.commit();
//Pausing the subscriptions
addressControl.pause(true);
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
session.commit();
GroovyRun.assertNull(consumer.receiveNoWait());
connection.close();

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TEN_ZERO;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
*
* cd /compatibility-tests
* mvn install -Ptests | tee output.log
*
* on the output.log you will see the output generated by {@link #getClasspath(String)}
*
* On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test.
* On Idea you would do the following:
*
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
@RunWith(Parameterized.class)
public class AddressPauseJournalCompatibilityTest extends VersionedBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
// we don't need every single version ever released..
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
List<Object[]> combinations = new ArrayList<>();
/*
// during development sometimes is useful to comment out the combinations
// and add the ones you are interested.. example:
*/
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
combinations.add(new Object[]{null, TWO_TEN_ZERO, SNAPSHOT});
// the purpose on this one is just to validate the test itself.
/// if it can't run against itself it won't work at all
combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT});
return combinations;
}
public AddressPauseJournalCompatibilityTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void removeFolder() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
serverFolder.getRoot().mkdirs();
}
@After
public void tearDown() {
try {
stopServer(serverClassloader);
} catch (Throwable ignored) {
}
try {
stopServer(receiverClassloader);
} catch (Throwable ignored) {
}
}
@Test
public void testSendReceiveTopic() throws Throwable {
internal("topic");
}
@Test
public void testSendReceiveQueue() throws Throwable {
internal("queue");
}
public void internal(String destinationName) throws Throwable {
setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true);
evaluate(senderClassloader, "addresspause/beforestop.groovy", destinationName);
stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);
evaluate(receiverClassloader, "addresspause/afterstop.groovy", destinationName);
stopServer(receiverClassloader);
// on a third try, we run the beforestop again, as the address should been in regular conditions when aftertop.groovy is finished
setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);
evaluate(receiverClassloader, "addresspause/beforestop.groovy", destinationName);
stopServer(receiverClassloader);
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_SIX_THREE;
/**
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
*
* cd /compatibility-tests
* mvn install -Ptests | tee output.log
*
* on the output.log you will see the output generated by {@link #getClasspath(String)}
*
* On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test.
* On Idea you would do the following:
*
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
@RunWith(Parameterized.class)
public class QueuePauseJournalCompatibilityTest extends VersionedBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
// we don't need every single version ever released..
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
List<Object[]> combinations = new ArrayList<>();
/*
// during development sometimes is useful to comment out the combinations
// and add the ones you are interested.. example:
*/
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
combinations.add(new Object[]{null, TWO_SIX_THREE, SNAPSHOT});
// the purpose on this one is just to validate the test itself.
/// if it can't run against itself it won't work at all
combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT});
return combinations;
}
public QueuePauseJournalCompatibilityTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void removeFolder() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
serverFolder.getRoot().mkdirs();
}
@After
public void tearDown() {
try {
stopServer(serverClassloader);
} catch (Throwable ignored) {
}
try {
stopServer(receiverClassloader);
} catch (Throwable ignored) {
}
}
@Test
public void testSendReceiveQueue() throws Throwable {
internal("queue");
}
public void internal(String destinationName) throws Throwable {
setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true);
evaluate(senderClassloader, "queuepause/beforestop.groovy", destinationName);
stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);
evaluate(receiverClassloader, "queuepause/afterstop.groovy", destinationName);
stopServer(receiverClassloader);
// on a third try, we run the beforestop again, as the address should been in regular conditions when aftertop.groovy is finished
setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);
evaluate(receiverClassloader, "queuepause/beforestop.groovy", destinationName);
}
}

View File

@ -53,5 +53,24 @@ public class AddressConfigTest extends ActiveMQTestBase {
Set<RoutingType> routingTypeSet = new HashSet<>();
routingTypeSet.add(RoutingType.MULTICAST);
assertEquals(routingTypeSet, addressInfo.getRoutingTypes());
assertFalse(addressInfo.isPaused());
addressInfo.setPostOffice(server.getPostOffice());
addressInfo.setStorageManager(server.getStorageManager());
addressInfo.pause(true);
assertTrue(addressInfo.isPaused());
long id = addressInfo.getId();
server.stop();
server.start();
addressInfo = server.getAddressInfo(SimpleString.toSimpleString("myAddress"));
assertNotNull(addressInfo);
routingTypeSet = new HashSet<>();
routingTypeSet.add(RoutingType.MULTICAST);
assertEquals(routingTypeSet, addressInfo.getRoutingTypes());
assertEquals(id, addressInfo.getId());
assertTrue(addressInfo.isPaused());
addressInfo.setPostOffice(server.getPostOffice());
addressInfo.setStorageManager(server.getStorageManager());
addressInfo.resume();
}
}

View File

@ -0,0 +1,156 @@
/*
* Copyright 2019 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Assert;
import org.junit.Test;
public class AddressPauseTest extends JMSTestBase {
@Override
protected boolean usePersistence() {
return true;
}
@Test
public void testPauseAddress() throws Exception {
try (Connection connection = cf.createConnection()) {
connection.setClientID("myClientID");
connection.start();
try (Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
Topic topic = session.createTopic("jms.topic.MyTopic");
TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1");
AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic");
MessageProducer producer = session.createProducer(topic);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
session.commit();
for (int i = 0; i < numMessages; i++) {
TextMessage m = (TextMessage) subscriber1.receive(5000);
Assert.assertNotNull(m);
}
session.commit();
//Pausing the subscriptions
addressControl.pause();
Assert.assertTrue(addressControl.isPaused());
//subscriber2 should be paused too
TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, "my-subscription2");
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
session.commit();
TextMessage message = (TextMessage) subscriber1.receiveNoWait();
Assert.assertNull(message);
message = (TextMessage) subscriber2.receiveNoWait();
Assert.assertNull(message);
//Resuming the subscriptions
addressControl.resume();
for (int i = 0; i < numMessages; i++) {
TextMessage m = (TextMessage) subscriber1.receive(5000);
Assert.assertNotNull(m);
}
session.commit();
for (int i = 0; i < numMessages; i++) {
TextMessage m = (TextMessage) subscriber2.receive(5000);
Assert.assertNotNull(m);
}
session.commit();
}
}
}
@Test
public void testPauseAddressServerRestart() throws Exception {
final int numMessages = 100;
try (Connection connection = cf.createConnection()) {
connection.setClientID("myClientID");
connection.start();
try (Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
Topic topic = session.createTopic("jms.topic.MyTopic");
TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1");
AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic");
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
session.commit();
for (int i = 0; i < numMessages; i++) {
TextMessage m = (TextMessage) subscriber1.receive(5000);
Assert.assertNotNull(m);
}
session.commit();
//Pausing the subscriptions
addressControl.pause(true);
}
}
server.stop();
server.start();
try (Connection connection = cf.createConnection()) {
connection.setClientID("myClientID");
connection.start();
try (Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
Topic topic = session.createTopic("jms.topic.MyTopic");
TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "my-subscription1");
AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + "jms.topic.MyTopic");
MessageProducer producer = session.createProducer(topic);
Assert.assertTrue(addressControl.isPaused());
//subscriber2 should be paused too
TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, "my-subscription2");
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
session.commit();
TextMessage message = (TextMessage) subscriber1.receiveNoWait();
Assert.assertNull(message);
message = (TextMessage) subscriber2.receiveNoWait();
Assert.assertNull(message);
//Resuming the subscriptions
addressControl.resume();
for (int i = 0; i < numMessages; i++) {
TextMessage m = (TextMessage) subscriber1.receive(5000);
Assert.assertNotNull(m);
}
session.commit();
for (int i = 0; i < numMessages; i++) {
TextMessage m = (TextMessage) subscriber2.receive(5000);
Assert.assertNotNull(m);
}
session.commit();
}
}
}
}

View File

@ -55,7 +55,7 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
@ -624,7 +624,7 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception {
return manager.storeQueueStatus(queueID, status);
}
@ -633,6 +633,17 @@ public class SendAckFailTest extends SpawnedTestBase {
manager.deleteQueueStatus(recordID);
}
@Override
public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception {
return manager.storeAddressStatus(addressID, status);
}
@Override
public void deleteAddressStatus(long recordID) throws Exception {
manager.deleteAddressStatus(recordID);
}
@Override
public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception {
manager.addAddressBinding(tx, addressInfo);

View File

@ -113,6 +113,26 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
return (long) proxy.retrieveAttributeValue("unRoutedMessageCount");
}
@Override
public void pause() throws Exception {
proxy.invokeOperation("pause");
}
@Override
public void pause(boolean persist) throws Exception {
proxy.invokeOperation("pause", persist);
}
@Override
public void resume() throws Exception {
proxy.invokeOperation("resume");
}
@Override
public boolean isPaused() {
return (boolean) proxy.retrieveAttributeValue("paused");
}
@Override
public String sendMessage(Map<String, String> headers,
int type,